Apache Spark とは【入門】

スポンサーリンク

初めに

本記事は、以下のビッグデータ分析基盤シリーズの Apache Spark 編です。

対象者

スポンサーリンク

Apache Spark とは

簡単に言うとたくさんのコンピュータを使ってめっちゃ早く計算するためのライブラリ群です。かしこまった言い方をすると・・・

Apache Spark は並列分散処理の基盤上にて、インメモリで処理を行うためのコンピューティングフレームワークです。

並列分散処理とは

並列分散処理とは、複数のコンピュータが協力して処理を行うことです。

1 人で作業するより、10 人で分担したほうが速いよねっていう話です。ただし、あまりにも簡単な作業の場合、分担作業をする間に一人でやったほうが早い点には注意してください。

ビッグデータ分析において Spark の立ち位置は ETL, SQL クエリエンジン(Spark SQL)に当たります。詳細は以下の記事をご覧ください。

Apache Spark は単体(Spark Standalone Cluster)で起動することも、Hadoop で起動することも可能です。

Hadoop で起動する場合は、SparkHadoop 上でどの機能に位置するのかについては以下の記事をご覧ください。

インメモリとは

インメモリとは、データストレージとしてストレージデバイス(SSD, HDD)ではなく、メインメモリを利用する方式のことです。

ストレージデバイスやメインメモリや後述するレジスタの違いは以下の記事のハードウェアの章で解説しています。

インメモリの利点を見るために、従来方式とインメモリ方式を比較します

従来方式:ストレージデバイスをデータストレージとして利用する場合

HDD や SSD をデータストレージとして利用する場合、以下のように処理を実施します。

上記の方式を見て、「Result1 を 1 度 HDD, SDD に書き戻して、もう 1 度読み直す処理なんて非効率だ!Result1 をメインメモリに置きっぱなしにしよう!」という考えに基づいて開発されたのがインメモリ方式の Apache Spark です。

インメモリ方式:メインメモリをデータストレージとして利用する場合

メインメモリをデータストレージとして利用する場合、以下のように処理を実施します。Apache Sparkフレームワークではこちらの方式を利用します。また、最終結果 Result2 のデータストレージとして HDD, SSD に Result2 を出力することも可能です。

インメモリを利用すると高速化される点は以下の 2 点です。

  • HDD, SSD へ Result1 の書き込みをスキップすることで高速化
  • HDD, SSD の代わりにメインメモリから Result1 を読み込むことで高速化

前者は手順が無くなるため高速化され、後者はメインメモリと SSD へのアクセス速度の差で高速化されます。

下記のブログによるとメインメモリへのアクセス速度は SSD へのアクセス速度と比較して 500 倍高速だそうです。メインメモリからの読み込みが増えるほど高速化が期待できますね。

  • メインメモリへのアクセスは 100 * 10-9 s
  • NVMe SSD へのアクセスは 50 * 10-6 s
各種メモリ/ストレージのアクセス時間,所要クロックサイクル,転送速度,容量の目安 - Qiita
各種メモリ/ストレージについて,2023年時点で標準的なアクセス時間,所要クロックサイクル,転送速度,容量を,各種カタログスペックを参考にまとめてみました.レジスタ(レジスタファイル)最近のCP…

さて、ここでもう 1 度 Apache Spark の説明を見て見ましょう。

Apache Spark は並列分散処理の基盤上にて、インメモリで処理を行うためのコンピューティングフレームワークです。

Apache Spark は大規模なデータに対して高速に処理するために用意されたフレームワークであることが理解できるはずです。

スポンサーリンク

Apache Spark をインストール

  1. sudo yum install -y java-1.8.0-openjdk
  2. 現在ダウンロード可能なバージョンを http://ftp.jaist.ac.jp/pub/apache/spark/ で確認します。

    spark-3.x.x をクリック

    URL と Spark 名の名前を合わせたものをメモします。

    URL + <ダウンロードするファイル名> を手順2の curl コマンドで指定する

    上記でメモした URL + spark 名を以下のコマンドでインストールします。赤線の場所で spark のバージョンを指定しています。

    curl http://ftp.jaist.ac.jp/pub/apache/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz -O
  3. sudo tar zxvf spark-3.0.1-bin-hadoop2.7.tgz -C /usr/local/lib/

    以下のエラーメッセージが出た場合

    tar: これは tar アーカイブではないようです
    
    gzip: stdin: not in gzip format
    tar: Child returned status 1
    tar: Error is not recoverable: exiting now

    spark で最新バージョンがリリースされており、古い URL が存在しないと考えられます。Spark の最新バージョンの URL を確認するために、手順2からやり直してください。

  4. sudo ln -s /usr/local/lib/spark-3.0.1-bin-hadoop2.7/ /usr/local/lib/spark
  5. vim ~/.bash_profile
    export SPARK_HOME=/usr/local/lib/spark
    export PATH=$SPARK_HOME/bin:$PATH
  6. bash_profile のリロード
    source ~/.bash_profile
  7. pyspark
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.0.0
          /_/
    
    Using Python version 2.7.16 (default, Dec 12 2019 23:58:22)
    SparkSession available as 'spark'.
    なお、Scala 版は spark-shell コマンドで起動できます。

無事に Apache Spark の インタラクティブシェルが起動できました。ちなみにこの状態では、単体サーバーで Apache Spark を起動しています。

並列分散処理の基盤上で pyspark を起動させたい場合

ここでは、並列分散処理の基盤に Spark Standalone Cluster を利用して pyspark を起動させてみます。Spark Standalone Cluster に参加するそれぞれのサーバーを「ノード」と言います。ノードは役割によって次の 2 種類が存在します。

  • 「マスターノード」: クラスターを管理するノード
  • 「スレーブノード」: 実際に計算を行うノード
以降ではサーバー A をマスターノード、サーバー B をスレーブノードとする Spark Standalone Cluster を構築します。

サーバー A の作業

  1. サーバー A を Spark Standalone Cluster のマスターノードとして稼働する
    $ cd $SPARK_HOME
    $ ./sbin/start-master.sh
  2. localhost:8080 にアクセスして、マスターノードの URL を確認する。

サーバー B の作業(要: Apache Spark インストール)

  1. サーバー B を Spark Standalone Cluster のスレーブノードとして稼働する
    $ cd $SPARK_HOME
    $ ./sbin/start-slave.sh "<マスターノードの URL>"
  2. サーバー B クラスターに参加して PySpark を起動
    $ pyspark --master "<マスターノードの URL>"

おめでとうございます。並列分散処理の基盤(Spark Standalone Cluster)上で pyspark を起動することができました。

なお、pyspark コマンドのオプション --master では Spark Standalone Cluster 以外のマスターノードも指定できます。詳細な説明は下記のリンクに記載があります。

SparkContextメモ(Hishidama's Apache Spark SparkContext Memo)

Apache Spark の使い方

pyspark 上で Spark Python API を利用して Apache Spark を動かしてみます。

Apache Spark ではデータを RDD (resilient distributed dataset) と呼ばれるコレクションとして扱うことで、インメモリ上で並列処理が可能となります。

RDD の細かい仕様は今は気にする必要はありませんが、詳しく知りたい人はこちらがわかりやすいです。

テスト用の List 型データ list を作成

list = [1,2,3,4,5]

RDD を作成 parallelize()

Spark 上でデータを操作するためには、RDD 型に変換する必要があります。RDD 型にはparallelize() メソッドを利用して変換します。

rdd = sc.parallelize(list)

RDD の要素をカウント count()

rdd.count()
5

RDD の要素を値の大きいものから順番に返す top()

値の大きい要素、上位2つを返却してみます。

rdd.top(2)
[5, 4]

値の大きいもの 2 つが返却されていることがわかります。

とても簡単ですね。後は自分のやりたい計算に応じて、以下の記事を参考に API を探してみてください。

Spark API チートシート - Qiita
目的Sparkのよく使うAPIを(主に自分用に)メモしておくことで、久しぶりに開発するときでもサクサク使えるようにしたい。とりあえずPython版をまとめておきます(Scala版も時間があれば加筆…

本記事は入門編なので、ここで終わりにしたいと思います。この記事では Apache Spark の概要からインストールし実際に API を操作するところまでで説明しました。Apache Spark って意外と簡単に触れるんだよってことが多くの人に伝われば幸いです。

関連記事

ビッグデータ分析基盤入門シリーズの続きは以下です。