SparkがGPU対応したようなので試してみた

*注意書き:クラスター環境ではなくGPUが動作するか確かめただけなので注意してください。

動作環境

  • OS: Ubuntu
  • CPU(1枚): Intel(R) Core(TM) i9-9900K CPU @ 3.60GHz
  • GPU(1枚): Geforce 2080 Ti
  • Memory: 64GB

大規模データを処理できるSparkがGPU対応したぞ

クラスター環境はもっていないから簡単に確かめるだけにしました。

下記の記事に簡単な試す手法が記述されていたので環境構築して試してみます。

https://nvidia.github.io/spark-rapids/docs/get-started/getting-started.html

環境構築

Java 8 のjdk11 から Sparkをサポートしているので導入します。

sudo apt install openjdk-8-jdk-headless

GPU環境構築

cudaのインストール手順に沿ってGPU用のソフトウェアを導入します。GPUドライバーとCUDAツールキットをダウンロードしてインストールします。

https://developer.nvidia.com/cuda-10.1-download-archive-base

wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/cuda-ubuntu1804.pin
sudo mv cuda-ubuntu1804.pin /etc/apt/preferences.d/cuda-repository-pin-600
wget http://developer.download.nvidia.com/compute/cuda/10.1/Prod/local_installers/cuda-repo-ubuntu1804-10-1-local-10.1.243-418.87.00_1.0-1_amd64.deb
sudo dpkg -i cuda-repo-ubuntu1804-10-1-local-10.1.243-418.87.00_1.0-1_amd64.deb
sudo apt-key add /var/cuda-repo-10-1-local-10.1.243-418.87.00/7fa2af80.pub
sudo apt-get update
sudo apt-get -y install cuda

Spark環境構築

下記ページにアクセスします。

https://spark.apache.org/downloads.html

下記の画像のように選択し、Download Spark: spark-3.0.0-bin-hadoop2.7.tgzでsparkをダウンロードします。

下記コマンドで解凍します。

tar xvzf spark-3.0.0-bin-hadoop2.7.tgz 

Prebuildにしているためビルド済みのバイナリがあるため、すぐに試すことができます。

$SPARK_HOMEのパスを設定してあとからコードを楽に使用できるようにします。

export SPARK_HOME=$HOME/spark-3.0.0-bin-hadoop2.7

SparkからGPUを使用できるようにjarファイルをダウンロードします。

https://nvidia.github.io/spark-rapids/docs/version/stable-release.html#download

下記が”DownLoad”のセクションに表示されているのでRAPIDS Sparkのjarファイルとcudaのバージョンに合わせたcuDFをダウンロードします。

Sparkパッケージ用のパスを作成しダウンロードしたjarファイルを移します。

sudo mkdir /opt/sparkRapidsPlugin
sudo mv ~/Downloads/rapids-4-spark_2.12-0.1.0.jar /opt/sparkRapidsPlugin/
sudo mv ~/Downloads/cudf-0.14-cuda10-1.jar /opt/sparkRapidsPlugin/

Sparkのパスを設定します。

export SPARK_RAPIDS_DIR=/opt/sparkRapidsPlugin
export SPARK_CUDF_JAR=${SPARK_RAPIDS_DIR}/cudf-0.14-cuda10-1.jar
export SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_DIR}/rapids-4-spark_2.12-0.1.0.jar

動作確認

CPUでの動作確認

下記コマンドでローカルのCPUで動作するか確認します。

$SPARK_HOME/bin/spark-shell \        
    --master local \        
    --num-executors 1 \        
    --conf spark.executor.cores=1 \  
    --driver-memory 10g \  
    --conf spark.locality.wait=0s \        
    --conf spark.sql.files.maxPartitionBytes=512m \        
    --conf spark.sql.shuffle.partitions=10

動作成功すれば下記のような画面になりscalaのコードを入力できるようになります。

20/09/04 14:29:51 WARN Utils: Your hostname, mogushi-desktop resolves to a loopback address: 127.0.1.1; using 192.168.1.20 instead (on interface eno1)
20/09/04 14:29:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/mogushi/spark/spark-3.0.0-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.0.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/09/04 14:29:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.20:4040
Spark context available as 'sc' (master = local, app id = local-1599197394685).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0
      /_/
         
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 11.0.8)
Type in expressions to have them evaluated.
Type :help for more information.

GPUでの動作速度を比較するため、仮のデータ・セットを作成してjoin処理を行います。join処理を行う際は時間計測のためwarmup処理と実時間計測の2回、行います。

実行時間は5801msでした。

scala> val df = sc.makeRDD(1 to 10000000, 6).toDF
df: org.apache.spark.sql.DataFrame = [value: int]

scala> val df2 = sc.makeRDD(1 to 10000000, 6).toDF
df2: org.apache.spark.sql.DataFrame = [value: int]

scala> df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a" === $"b").count
res0: Long = 10000000                                                           

scala> val start = System.currentTimeMillis; df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a" === $"b").count; println(System.currentTimeMillis - start);
5801                                                                            
start: Long = 1599197797794

GPUでの動作確認

下記コマンドでローカルで動作するか確認します。

$SPARK_HOME/bin/spark-shell  \      
    --master local \
    --num-executors 1 \        
    --conf spark.executor.cores=1 \        
    --conf spark.rapids.sql.concurrentGpuTasks=1 \        
    --driver-memory 10g \        
    --conf spark.rapids.memory.pinnedPool.size=9G \        
    --conf spark.locality.wait=0s \        
    --conf spark.sql.files.maxPartitionBytes=512m \        
    --conf spark.sql.shuffle.partitions=10 \        
    --conf spark.plugins=com.nvidia.spark.SQLPlugin \        
    --jars ${SPARK_CUDF_JAR},${SPARK_RAPIDS_PLUGIN_JAR}

動作すれば下記のようになるはずです。

20/09/04 13:57:34 WARN Utils: Your hostname, mogushi-desktop resolves to a loopback address: 127.0.1.1; using 192.168.1.20 instead (on interface eno1)
20/09/04 13:57:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/mogushi/spark/spark-3.0.0-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.0.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/09/04 13:57:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/09/04 13:57:43 WARN SQLExecPlugin: Installing extensions to enable rapids GPU SQL support. To disable GPU support set `spark.rapids.sql.enabled` to false
Spark context Web UI available at http://192.168.1.20:4040
Spark context available as 'sc' (master = local, app id = local-1599195457578).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0
      /_/
         
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 11.0.8)
Type in expressions to have them evaluated.
Type :help for more information.

ここから試しにJoinの操作を試してみます。下記のように動作していることが分かります。

実行時間は1440msでした。4倍程度、早くなっています。

scala> val df = sc.makeRDD(1 to 10000000, 6).toDF
df: org.apache.spark.sql.DataFrame = [value: int]

scala> val df2 = sc.makeRDD(1 to 10000000, 6).toDF
df2: org.apache.spark.sql.DataFrame = [value: int]

scala> df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a" === $"b").count
res0: Long = 10000000                                                           

scala> val start = System.currentTimeMillis; df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a" === $"b").count; println(System.currentTimeMillis - start);
1440                                                                            
start: Long = 1599198004715

メモリが足りない場合は下記のようにエラーが出ますが操作は可能です。

空き容量が9912.9375 MB以上のメモリがあるGPUが必要になります。

20/09/04 14:18:27 WARN GpuDeviceManager: Initial RMM allocation(9912.9375 MB) is larger than free memory(9910.625 MB)
20/09/04 14:18:27 ERROR GpuDeviceManager: Could not initialize RMM
ai.rapids.cudf.CudfException: CNMEM error at: /usr/local/rapids/include/rmm/mr/device/cnmem_memory_resource.hpp142: CNMEM_STATUS_OUT_OF_MEMORY
	at ai.rapids.cudf.Rmm.initializeInternal(Native Method)
	at ai.rapids.cudf.Rmm.initialize(Rmm.java:160)
	at com.nvidia.spark.rapids.GpuDeviceManager$.initializeRmm(GpuDeviceManager.scala:192)
	at com.nvidia.spark.rapids.GpuDeviceManager$.initializeMemory(GpuDeviceManager.scala:222)
	at com.nvidia.spark.rapids.GpuDeviceManager$.initializeGpuAndMemory(GpuDeviceManager.scala:126)
	at com.nvidia.spark.rapids.RapidsExecutorPlugin.init(Plugin.scala:230)
	at org.apache.spark.internal.plugin.ExecutorPluginContainer.$anonfun$executorPlugins$1(PluginContainer.scala:111)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
	at org.apache.spark.internal.plugin.ExecutorPluginContainer.<init>(PluginContainer.scala:99)
	at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:164)
	at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:152)
	at org.apache.spark.executor.Executor.$anonfun$plugins$1(Executor.scala:158)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:221)
	at org.apache.spark.executor.Executor.<init>(Executor.scala:158)
	at org.apache.spark.scheduler.local.LocalEndpoint.<init>(LocalSchedulerBackend.scala:64)
	at org.apache.spark.scheduler.local.LocalSchedulerBackend.start(LocalSchedulerBackend.scala:132)
	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:201)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:550)
	at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2555)
	at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$1(SparkSession.scala:930)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
	at org.apache.spark.repl.Main$.createSparkSession(Main.scala:106)
	at $line3.$read$$iw$$iw.<init>(<console>:15)
	at $line3.$read$$iw.<init>(<console>:42)
	at $line3.$read.<init>(<console>:44)
	at $line3.$read$.<init>(<console>:48)
	at $line3.$read$.<clinit>(<console>)
	at $line3.$eval$.$print$lzycompute(<console>:7)
	at $line3.$eval$.$print(<console>:6)
	at $line3.$eval.$print(<console>)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
	at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
	at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
	at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
	at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:600)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:570)
	at scala.tools.nsc.interpreter.IMain.$anonfun$quietRun$1(IMain.scala:224)
	at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
	at scala.tools.nsc.interpreter.IMain.quietRun(IMain.scala:224)
	at org.apache.spark.repl.SparkILoop.$anonfun$initializeSpark$2(SparkILoop.scala:83)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.repl.SparkILoop.$anonfun$initializeSpark$1(SparkILoop.scala:83)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.tools.nsc.interpreter.ILoop.savingReplayStack(ILoop.scala:99)
	at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:83)
	at org.apache.spark.repl.SparkILoop.$anonfun$process$4(SparkILoop.scala:165)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.tools.nsc.interpreter.ILoop.$anonfun$mumly$1(ILoop.scala:168)
	at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
	at scala.tools.nsc.interpreter.ILoop.mumly(ILoop.scala:165)
	at org.apache.spark.repl.SparkILoop.loopPostInit$1(SparkILoop.scala:153)
	at org.apache.spark.repl.SparkILoop.$anonfun$process$10(SparkILoop.scala:221)
	at org.apache.spark.repl.SparkILoop.withSuppressedSettings$1(SparkILoop.scala:189)
	at org.apache.spark.repl.SparkILoop.startup$1(SparkILoop.scala:201)
	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:236)
	at org.apache.spark.repl.Main$.doMain(Main.scala:78)
	at org.apache.spark.repl.Main$.main(Main.scala:58)
	at org.apache.spark.repl.Main.main(Main.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Close Bitnami banner
Bitnami