【入門】Hadoop とは?MapReduce の使い方やエコシステム一覧

Apache HadoopApache Hadoop とは、並列分散処理を実現するミドルウェアです。

「ビッグデータを1台のコンピュータで処理すると時間がかかりすぎるため、コンピューターをいっぱい並べて高速に処理しよう。」というのが Hadoop 導入のモチベーションになります。

スポンサーリンク

初めに

本記事は、ビッグデータ分析基盤シリーズの Hadoop 編です。

本記事の対象者

  • Hadoopって何?どんな構成なの?
  • Hadoop エコシステム(できること)一覧を知りたい
  • Hadoop の環境構築をしたい
  • WordCount(Hadoop における Hello World)が実行できるようになりたい
スポンサーリンク

Hadoop を構成する3つのレイヤー

Hadoop のアーキテクチャは、主に以下の3つのレイヤーから構成されます。

また、Hadoop ではデータにアクセスするためにクエリエンジンを利用することが多いです。

Hadoop では、すべてのコンピュータに上記の構成をインストールし、データの読み書きや処理を分散します。

分散処理エンジン

分散処理エンジンは、Hadoop における並列分散処理を担うソフトウェアのことです。

デフォルトでは MapReduce と呼ばれる分散処理エンジンが稼働しています。

MapReduce 処理の概要について

MapReduce は以下の手順で分散処理を行います。

  1. Map: 入力を key-value 形式として出力、各 Map をノードごとに分散可能
  2. Shuffle: Map の出力をソート
  3. Reduce: 同じキーを集約
https://software.fujitsu.com/jp/manual/manualfiles/m160011/j2ul1930/02z200/j1930-00-01-01-00.html

代表的な分散処理エンジンの特徴は以下のとおりです。下に行くほど速いです。

Spark については以下の記事で詳細を解説しているのでご覧ください。

ちなみに MapReduce はオワコンらしいので Tez か Spark を使いましょう。

リソースマネージャー

リソースマネージャーはHadoop におけるリソースCPU ,メモリ管理を担います。

MapReduce で利用するリソースマネージャーは、アプリケーションレベルのコンテナを管理する Hadoop YARN です。Hadoop YANR の動作については下記の記事がわかりやすいです。

あの日見たYARNのお仕事を僕達はまだ知らない。 - Qiita
あなたが実行したジョブのこと考えてない間、ずっとYARNがジョブのこと考えててくれてたんだみなさんはYARNの存在をどれだけ気づいてあげられているだろうか。よくSparkと一緒にYARNって単語…

その他にも OS レベルのコンテナを管理する Apache Mesos もあります。こちらは docker と同じ技術(Linux コンテナ)を利用します。

分散ファイルシステム

分散ファイルシステムは、Hadoop におけるデータの読み書きの分散を担います。Hadoop 上で利用される分散ファイルシステムには以下のものがあります。

他にもストレージとして Cloud Storage や Blob Storage を利用可能なようですが、内部でどんな分散ファイルシステムを利用するのかは不明です。(公開されていたら誰か教えて下さい)

スポンサーリンク

Hadoop エコシステム一覧

デフォルト以外の Hadoop を構成するソフトウェア、もしくは周辺のソフトウェアを Hadoop エコシステムと言います。

Hadoop エコシステムは以下のように組み合わせることで、様々な分散処理が実現可能です。

  • データウェアハウス構成例:Hadoop + Tez + Hive
    Hive を利用して Hadoop を SQL で操作可能にします。
    Hive については以下の記事で解説しています。
  • 機械学習構成例:Hadoop + Spark
    機械学習でよく発生する反復処理を Spark のインメモリ処理を利用して効率化します。
    Spark については以下の記事で解説しています。
  • ストリーム処理構成例:各サーバーや IoT機器 --> Kafka --> Hadoop
    複数のサーバーや IoT 機器からストリーム処理を行い、Hadoop にデータを集約する場合は Kafka を利用します。
    Kafka については以下に記事で解説しています。

以下に代表的な Hadoop エコシステムや関連するシステムと、その機能を紹介します。

Hadoop エコシステム実現する機能
Apache AccumuloKVS型のNoSQL。セキュリティ重視
Apache Atlasガバナンスコントロール、コンプライアンス対応
CascadingMapReduceを簡単に扱うAPI
Apache Drillエッジ機器のデータを操作する分散SQLエンジン
Apache falconデータライフサイクルを管理
Apache Flume複数のデータソースからHadoopに非構造化データを集約(ストリームデータ処理)
Apache HBaseKVS型のNoSQL
Apache HiveSQLライク(HiveQL)なクエリでデータを操作できる。対障害性重視する場合。DWHを実現
Apache HueHadoopやHadoopエコシステムGUIで操作
Apache ImpalaSQLライク(Impala SQL)なクエリでデータを操作できる。速度を重視する場合。リアルタイム処理を実現
Apache Kafka複数のデータソースからHadoopに非構造化データを集約(ストリームデータ処理)。flumeとの違いはここ
Apache Knox集中管理型の認証・アクセス管理
Apache Mahout線形代数、統計解析、機械学習ライブラリ
Apache MesosOS レベルのコンテナを管理するリソースマネージャー
Apache Oozieジョブのスケジューラ
Apache PhoenixHBaseをデータストアとして利用するリアルタイムRDB
Apache Pigデータの加工(ETL)ツール
Apache Ranger認証済みのユーザーに対して属性ベースでアクセス権限を付与
Apache Sentry認証済みのユーザーに対してロールベースでアクセス権限を付与
Apache SliderYARNアプリケーションの制御。長時間起動している場合はKillするなど
Apache Solr全文検索Elasticsearchで利用されています。)
Apache Spark機械学習、SQL操作、R言語、グラフをインメモリで処理
Apache SqoopRDBMSからHadoopに構造化データのインポート、エクスポート
Apache TezMapReduceより速い分散処理フレームワーク
Presto中間結果をメモリに出力する SQL クエリエンジン。 Hive との違いはこちら

Hadoop をインストール

ここからは Hadoop をインストールし、実際に Hadoop を触ってみます。

事前準備:Java 8 をインストール

sudo yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version
openjdk version "1.8.0_265"
OpenJDK Runtime Environment (build 1.8.0_265-b01)
OpenJDK 64-Bit Server VM (build 25.265-b01, mixed mode)

Hadoop をインストール

以下のサイトからダウンロードする Hadoop のバージョンに対応する URL を確認します。

Index of /pub/apache/hadoop/common

今回は現時点での最新バージョンである hadoop-3.3.0 をダウンロードします。

<現在の URL> + <ダウンロードするアーカイブ名> をメモします。

例:https://ftp.jaist.ac.jp/pub/apache/hadoop/common/hadoop-<バージョン名>/hadoop-<バージョン番号>.tar.gz

メモした URL を指定してダウンロードします。

Hadoop が更新されると、リポジトリから古いバージョンは削除されるため必ず赤線箇所のバージョンは上記でメモした URL に置き換えてください

curl https://ftp.jaist.ac.jp/pub/apache/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz -O
tar zxfv hadoop-3.3.0.tar.gz

Hadoop を利用するための設定

echo "export HADOOP_HOME=/home/`whoami`/hadoop-3.3.0" >> ~/.bash_profile
echo "export PATH=$PATH:/home/`whoami`/hadoop-3.3.0/bin" >> ~/.bash_profile
source ~/.bash_profile
mkdir -p ~/var/lib/hdfs/{name,data}

Hadoop 自体の設定

Hadoop で利用する JAVA_HOME 環境変数を設定します。

vim hadoop-3.3.0/etc/hadoop/hadoop-env.sh
export JAVA_HOME=$(dirname $(readlink $(readlink $(which java)))|sed 's/\/jre\/bin//g')

JAVA_HOME の場所は /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.265.b01-1.amzn2.0.1.x86_64 となるようなコマンドを実行しています。

次に分散ファイルシステム HDFS にアクセスする URL を設定します。

vim $HADOOP_HOME/etc/hadoop/core-site.xml
<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
  </property>
</configuration>

最後に Name ノード、Data ノードの設定をします。

vim $HADOOP_HOME/etc/hadoop/hdfs-site.xml
<configuration>
  <property>
    <name>dfs.name.dir</name>
    <value>/home/ec2-user/var/lib/hdfs/name</value>
  </property>
  <property>
    <name>dfs.data.dir</name>
    <value>/home/ec2-user/var/lib/hdfs/data</value>
  </property>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
</configuration>

赤線箇所は必ず自分のホームディレクトリ(Hadoop をインストールしたディレクトリ)を指定してください。

Hadoop を起動

まずは Hadoop を初期化します。

hadoop namenode -format
$HADOOP_HOME/sbin/start-dfs.sh
sudo netstat -antp|grep 9000
tcp        0      0 127.0.0.1:9000          0.0.0.0:*               LISTEN      7707/java           
tcp        0      0 127.0.0.1:9000          127.0.0.1:37980         ESTABLISHED 7707/java           
tcp        0      0 127.0.0.1:37980         127.0.0.1:9000          ESTABLISHED 7868/java 

9000 番ポートが LISTEN になっていない場合、どこかの設定が間違っている可能性があります。

hadoop fs -mkdir -p /user/hadoop
hadoop fs -ls /user/

作成したディレクトリが表示されれば成功です。実行したコマンドの詳細は次節で解説します。補足ですが、Hadoop を停止するには「$HADOOP_HOME/sbin/stop-dfs.sh」コマンドを実行します。

Hadoop 分散ファイルシステムの使い方

Hadoop の分散ファイルシステムhadoop fs コマンドで操作します。

hadoop fs -ls /
Found 1 items
drwxr-xr-x   - ec2-user supergroup          0 2020-11-25 16:59 /user

その他にも hadoop fs コマンドでは、Linux でよく利用する cat, rm, cp など様々なオプションが用意されています。hadoop fs コマンドのオプション一覧は以下の公式ドキュメントをご覧ください。

Apache Hadoop 3.3.6 – Overview

なお、通常の ls コマンドの結果と比較することで、ローカルのファイルシステム(Linux では xfs ファイルシステムなど)とHadoop の分散ファイルシステムでは、別のファイルが管理されていることがわかります。

ls /
bin  boot  dev  etc  home  lib  lib64  local  media  mnt  opt  proc  root  run  sbin  srv  sys  tmp  usr  var

ローカルのファイルシステムから Hadoop の分散ファイルシステムにファイルを移動

Hadoop でファイルを処理するためには、Hadoop の分散ファイルシステムファイルを置く必要があります。

ローカルのファイルシステムから Hadoop の分散ファイルシステムファイルをコピーする場合は put オプションを利用します。

コピーするファイルの準備

echo "Hello World Bye World" > file01
echo "Hello Hadoop Goodbye Hadoop" > file02

put オプションを利用して分散ファイルシステムファイルをコピー

ローカルファイルシステムにあるファイル file01, file02 を Hadoop の分散ファイルシステムにある /user/hadoop パスにコピーします。

hadoop fs -put -f file01 file02 /user/hadoop
hadoop fs -ls /user/hadoop
Found 2 items
-rw-r--r--   1 hadoop hadoop         22 2020-11-24 14:10 /user/hadoop/file01
-rw-r--r--   1 hadoop hadoop         28 2020-11-24 14:10 /user/hadoop/file02

Hadoop の分散ファイルシステムの /user/hadoop パスに file01, file02 がコピーできていることがわかります。

Hadoop MapReduce で WordCount を実装

MapReduce は 分散処理エンジンの章で記載したとおり Hadoop で稼働するデフォルトの分散処理エンジンです。

ここでは Hadoop の分散処理における HelloWorld 的な立ち位置にいる WordCount プログラムを実装します。WordCount とはファイルに含まれる単語ごとの出現回数を分散処理でカウントするプログラムです。

本記事では Hadoop で王道の Java を使ったプログラミングと、僕みたいに Java が嫌いな人の逃げ先用の Python を使ったプログラミングの2種類を紹介します。

【Java】Hadoop MapReduce で WordCount を実装

Mapreduce を利用した WordCount の実装は、以下の Apache Hadoop 公式チュートリアルに従います。

Apache Hadoop 3.3.6 – MapReduce Tutorial

事前準備

WordCount プログラムでカウントする用のファイルを分散ファイルシステムに配置

前章の put オプションの説明を参考に WordCount でカウントする用のファイルを分散ファイルシステムに配置する

hadoop fs -cat /user/hadoop/file01
Hello World Bye World
hadoop fs -cat /user/hadoop/file02
Hello Hadoop Goodbye Hadoop

ソースコードを配置するディレクトリを作成

mkdir -p mkdir src/main/java

Gradle をインストール

以下を参考に Gradle をインストールします。(自分で依存関係を解決できる場合は不要です。)

WordCount プログラムを作成

vim src/main/java/WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

WordCount プログラムをコンパイル

コンパイル用のビルドスクリプトを作成します。

vim build.gradle
plugins {
    id 'java'
}

archivesBaseName = 'wc'

//リポジトリの設定
repositories {
    mavenCentral() //https://repo.maven.apache.org/maven2/ から取得
}

//リポジトリから取得するモジュールの設定
dependencies {
    compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.0'
    compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0'

}

WordCount プログラムをコンパイルします。

gradle build
ls build/libs/wc.jar
build/libs/wc.jar

WordCount を実行

hadoop jar build/libs/wc.jar WordCount /user/hadoop/ /user/hadoop/output

ポートのエラーが出た場合は、メッセージに記載されたポートを開けてください。以下のエラーが出た場合はメモリ不足なので実行環境のスペックを上げてください。

There is insufficient memory for the Java Runtime Environment to continue.

hadoop fs -ls /user/hadoop/output
Found 4 items
-rw-r--r--   1 hadoop hadoop          0 2020-11-24 15:08 /user/hadoop/output/_SUCCESS
-rw-r--r--   1 hadoop hadoop          8 2020-11-24 15:08 /user/hadoop/output/part-r-00000
-rw-r--r--   1 hadoop hadoop         16 2020-11-24 15:08 /user/hadoop/output/part-r-00001
-rw-r--r--   1 hadoop hadoop         17 2020-11-24 15:08 /user/hadoop/output/part-r-00002

私の環境では3ノード実行したため、3つのファイルが作成されていることが確認できました。

正しく単語がカウントできていることを確認

part-r-0000* の内容を見ると、file01 と file02 に含まれる単語が正しくカウントできていることがわかります。

hadoop fs -cat /user/hadoop/output/part-r-0000*
Hello	2
Bye	1
Goodbye	1
Hadoop	2
World	2
hadoop fs -cat /user/hadoop/file01
Hello World Bye World
hadoop fs -cat /user/hadoop/file02
Hello Hadoop Goodbye Hadoop

処理を分散していることを確認

ファイルを確認するとノードごとにカウントするワードを手分けすることで処理を分散していることがわかります。

hadoop fs -cat /user/hadoop/output/part-r-00000
Hello	2
hadoop fs -cat /user/hadoop/output/part-r-00001
Bye	1
Goodbye	1
hadoop fs -cat /user/hadoop/output/part-r-00002
Hadoop	2
World	2

【Python】 Hadoop MapReduce で WordCount を実装

MapReduce を Java 以外の言語で使用する場合は、Hadoop Streaming というユーティリティを使用します。

Hadoop Streaming では mappers, reducers に実行可能ファイルを指定すると、指定したファイルを起動して map, reduce 処理を行います。つまり python ファイルを指定すれば python ファイルで MapReduce 処理が行えます。

Hadoop Streaming を利用した WordCount の実装は以下の Apache Hadoop 公式チュートリアルに従っています。

Apache Hadoop MapReduce Streaming – Hadoop Streaming

事前準備

hadoop fs -mkdir -p /user/$(whoami)/myInputDirs
echo "Hello World Bye World" > file01
echo "Hello Hadoop Goodbye Hadoop" > file02
hadoop fs -put -f file01 file02 /user/$(whoami)/myInputDirs

WordCount プログラムを作成

vim myAggregatorForKeyCount.py
#!/usr/bin/python3
  
import sys

def generateLongCountToken(id):
    return "LongValueSum:" + id + "\t" + "1"

def main(argv):
    line = sys.stdin.readline() #-input オプションで指定したディレクトリにあるファイルから1行ずつ読む
    try:
        while line:
            line = line[:-1]
            fields = line.split(" ") #空白区切りで単語を区切る
            for word in fields:
                print(generateLongCountToken(word)) #["LongValueSum:" + 単語 + "\t" + "1"] の形で出力
            line = sys.stdin.readline()
    except "end of file":#ファイルが終わるまで
        return None

if __name__ == "__main__":
     main(sys.argv)

Hadoop Streaming で WordCount プログラムを実行

端末上で下記のコマンドを入力します。 mapper には先程作成したプログラム、reducer には aggregate を指定します。

mapred streaming \
  -input myInputDirs \
  -output myOutputDir \
  -mapper myAggregatorForKeyCount.py \
  -reducer aggregate \
  -file myAggregatorForKeyCount.py

MapReduce で WordCount 処理が成功した場合は以下のようなファイルが作成されます。

hadoop fs -ls /user/$(whoami)/myOutputDir
Found 2 items
-rw-r--r--   1 ec2-user supergroup          0 2020-11-26 14:05 /user/ec2-user/myOutputDir/_SUCCESS
-rw-r--r--   1 ec2-user supergroup         41 2020-11-26 14:05 /user/ec2-user/myOutputDir/part-00000

part-0000*は Hadoop のノードの数だけ作成されます。(処理が分散されるため)

hadoop fs -cat /user/$(whoami)/myOutputDir/part-*
Bye	1
Goodbye	1
Hadoop	2
Hello	2
World	2

インプットしたファイルは以下のとおりのため、単語ごとに正しく出現回数をカウントできていることがわかります。

hadoop fs -cat /user/$(whoami)/myInputDirs/*
Hello World Bye World
Hello Hadoop Goodbye Hadoop

関連記事

ビッグデータ分析基盤入門シリーズの続きは以下です。


参考サイト

本記事を記載するにあたり、下記のサイト、書籍を参考にしました。

公式ドキュメント

Hadoop – Apache Hadoop 3.3.6