Apache Spark 入門

ビッグデータ

対象者

  • Apache Spark をゼロ知識から知りたい人
  • Hadoop は一旦置いておいて Apache Spark 単体を学習したい人
  • とりあえず Apache Spark が動く環境が欲しい人

Apache Spark とは

簡単に言うとたくさんのコンピュータを使ってめっちゃ早く計算するためのライブラリ群です。かしこまった言い方をすると・・・

Apache Spark は並列分散処理の基盤上にて、インメモリで処理を行うためのコンピューティングフレームワークです。

並列分散処理とは

並列分散処理とは、複数のコンピュータが協力して処理を行うことです。

1 人で作業するより、10 人で分担したほうが速いよねっていう話です。ただし、あまりにも簡単な作業の場合、作業を 10 分割して、作業表を 10 人に配って・・・と準備する間に自分 1 人でやってしまった方が速い点には注意してください。1 秒未満で終わるような処理を分散させる必要はありません。

よく使われる並列分散処理の基盤としては次の 3 つです。

Apache Spark は並列分散処理の基盤に Hadoop を選択することが多いですが、Apache Spark の 並列分散処理の基盤である Spark Standalone Cluster でも動作します。また、単体サーバーでも動作可能です。

インメモリとは

インメモリとは、データストレージとして HDD や SSD ではなく、メインメモリを利用する方式のことです。インメモリの利点を見るために、まずはインメモリ方式では無い(データストレージとして HDD や SSD を利用する)場合を見てみます。

HDD や SSD をデータストレージとして利用する場合

HDD や SSD をデータストレージとして利用する場合、以下のように処理を実施します。

上記の方式を見て、「Result1 を 1 度 HDD, SDD に書き戻して、もう 1 度読み直す処理なんて非効率だ!Result1 をメインメモリに置きっぱなしにしよう!」という考えに基づいて開発されたのがインメモリ方式の Apache Spark です。

メインメモリをデータストレージとして利用する場合: インメモリ方式

メインメモリをデータストレージとして利用する場合、以下のように処理を実施します。Apache Spark のフレームワークではこちらの方式を利用します。また、最終結果 Result2 のデータストレージとして HDD, SSD に Result2 を出力することも可能です。

インメモリを利用すると高速化される点は以下の 2 点です。

  • HDD, SSD へ Result1 の書き込みをスキップすることで高速化
  • HDD, SSD の代わりにメインメモリから Result1 を読み込むことで高速化

前者は手順が無くなるため高速化され、後者はメインメモリと SSD へのアクセス速度の差で高速化されます。

下記のブログによるとメインメモリへのアクセス速度は SSD へのアクセス速度と比較して 500 倍高速だそうです。メインメモリからの読み込みが増えるほど高速化が期待できますね。

  • メインメモリへのアクセスは 100 * 10-9 s
  • NVMe SSD へのアクセスは 50 * 10-6 s
各種メモリ/ストレージのアクセス時間,所要クロックサイクル,転送速度,容量の目安 - Qiita
各種メモリ/ストレージについて,2020年時点で標準的なアクセス時間,所要クロックサイクル,転送速度,容量を,各種カタログスペックを参考にまとめてみました。 レジスタ(レジスタファイル) 最近のCPUのレジスタ(register...

さて、ここでもう 1 度 Apache Spark の説明を見て見ましょう。

Apache Spark は並列分散処理の基盤上にて、インメモリで処理を行うためのコンピューティングフレームワークです。

Apache Spark は大規模なデータに対して高速に処理するために用意されたフレームワークであることが理解できるはずです。

Apache Spark をインストールする

  1. Java のインストール(インストールしていない場合)
    sudo yum install -y java-1.8.0-openjdk
  2. Apache Spark のインストール
    (以降の手順で、赤線部分はダウンロードする Apache Spark のバージョンによって変更してください。現在ダウンロード可能なバージョンは http://ftp.jaist.ac.jp/pub/apache/spark/ から確認してください。)
    $ curl http://ftp.jaist.ac.jp/pub/apache/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz -O
  3. ダウンロードしたファイルを解凍
    $ sudo tar zxvf spark-3.0.0-bin-hadoop2.7.tgz -C /usr/local/lib/
  4. シンボリックリンクの追加
    $ sudo ln -s /usr/local/lib/spark-3.0.0-bin-hadoop2.7/ /usr/local/lib/spark
  5. 環境変数の追加
    $ vim ~/.bash_profile
    export SPARK_HOME=/usr/local/lib/spark
    export PATH=$SPARK_HOME/bin:$PATH
  6. bash_profile のリロード
    $ source /etc/profile
  7. PySpark(Python 版の Spark-shell)を起動。(Scala 版は spark-shell コマンド)
    $ pyspark
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.0.0
          /_/
    
    Using Python version 2.7.16 (default, Dec 12 2019 23:58:22)
    SparkSession available as 'spark'.
    

無事に Apache Spark の インタラクティブシェルが起動できました。ちなみにこの状態では、単体サーバーで Apache Spark を起動しています。

並列分散処理の基盤上で pyspark を起動させたい場合

ここでは、並列分散処理の基盤に Spark Standalone Cluster を利用して pyspark を起動させてみます。Spark Standalone Cluster に参加するそれぞれのサーバーを「ノード」と言います。ノードは役割によって次の 2 種類が存在します。

  • 「マスターノード」: クラスターを管理するノード
  • 「スレーブノード」: 実際に計算を行うノード
以降ではサーバー A をマスターノード、サーバー B をスレーブノードとする Spark Standalone Cluster を構築します。

サーバー A の作業

  1. サーバー A を Spark Standalone Cluster のマスターノードとして稼働する
    $ cd $SPARK_HOME
    $ ./sbin/start-master.sh
  2. localhost:8080 にアクセスして、マスターノードの URL を確認する。

サーバー B の作業(要: Apache Spark インストール)

  1. サーバー B を Spark Standalone Cluster のスレーブノードとして稼働する
    $ cd $SPARK_HOME
    $ ./sbin/start-slave.sh "<マスターノードの URL>"
  2. サーバー B クラスターに参加して PySpark を起動
    $ pyspark --master "<マスターノードの URL>"

おめでとうございます。並列分散処理の基盤(Spark Standalone Cluster)上で pyspark を起動することができました。

なお、pyspark コマンドのオプション –master では Spark Standalone Cluster 以外のマスターノードも指定できます。詳細な説明は下記のリンクに記載があります。

SparkContextメモ(Hishidama's Apache Spark SparkContext Memo)

Apache Spark を動かしてみる

pyspark 上で Spark Python API を利用して Apache Spark を動かしてみます。

Apache Spark ではデータを RDD (resilient distributed dataset) と呼ばれるコレクションとして扱います。RDD の細かい仕様は今は気にする必要はありませんが、詳しく知りたい人はこちらがわかりやすいです。

RDD を作成 parallelize()

parallelize() メソッドを利用して、リストデータ list から RDD を作成しています。

$ pyspark

>>> list = [1,2,3,4,5]
>>> rdd = sc.parallelize(list)

ちなみに pyspark では予め下記のコードが実行されていると理解ください。

from pyspark import SparkContext

sc = SparkContext()

RDD の要素をカウント count()

>>> rdd.count()
5

RDD の要素を値の大きいものから順番に返す top()

rdd.top(2)
[5, 4]

2 を指定しているので、値の大きいもの 2 つが返却されていることがわかります。

とても簡単ですね。API の一覧は下記の記事がわかりやすくまとまっています。

Spark API チートシート - Qiita
目的 Sparkのよく使うAPIを(主に自分用に)メモしておくことで、久しぶりに開発するときでもサクサク使えるようにしたい。とりあえずPython版をまとめておきます(Scala版も時間があれば加筆するかも) このチートシート...

ここまで理解できると、後は自分でプログラムをガリガリ書いて理解を深めていけると思います。Apache Spark って意外と簡単に触れるんだよってことが多くの人に伝われば幸いです。

0

コメント

タイトルとURLをコピーしました