Hadoop Streamingで、MapReduceをPHPで実装する


はじめに

こんにちは、t_oookaです。
今回は、Hadoop Streamingを使用して、PHPでmap,reduce処理を実装してみます。

Hadoop Streamingとは、hadoopが提供するユーティリティで、当該ユーティリティを使用することでJava以外の任意の言語やコマンド(標準入力、標準出力を扱える)などでmap,reduce処理を記述することができるようになります。

環境

  • CentOS 6.5
  • jdk 1.7.0_55
  • CDH(Cloudera’s Distribution including Apache Hadoop) 5
  • PHP 5.3.3

構成

完全分散モードで、管理ノード(hadoop-master)1台、計算ノード(hadoop-slave)1台及びクライアント(hadoop-client)1台の合計3台の構成です。

なお、ジョブの投入などはクライアント(hadoop-client)から実行することとします。

Hadoop Streamingの実行方法

$ hadoop                                                          \
   jar     /path/to/hadoop-streaming.jar                          \
  -files   /path/to/sample_mapper.php,/path/to/sample_reducer.php \
  -input   /path/to/input_dir                                     \
  -output  /path/to/output_dir                                    \
  -mapper  mapper_script_name                                     \
  -reducer reducer_script_name

Hadoop Streamingを利用する場合は、hadoop jarコマンドに「hadoop-streaming.jar」を指定します。

その他のオプションについては、以下の通りです。

オプション 内容
files hadoopのクラスタへコピーするファイルを指定します。複数指定する場合は、「,」で区切ります
input 解析対象のファイル名または、ディレクトリ名を指定します
output 解析結果を出力するディレクトリ名を指定します
mapper map処理を実装したスクリプトをなどを指定します
reducer reduce処理を実装したスクリプトなどを指定します

PHPによるMapReduceの実装

今回は、AWSのELBのアクセスログからELBのステータスコード毎の件数を集計する処理を実装して動作を確認します。

ELBのアクセスログは、1行がスペースを区切り文字として13項目で構成されます。
※ 詳細はElastic Load Balancing 開発者ガイド アクセスログをご参照ください。

map処理の実装

map処理では、hadoopの仕組みにより標準入力へELBのアクセスログが行単位で渡されてくるため、1行ごとに標準入力より読み取り、”ステータスコード”<TAB>”件数”を出力するようにしています。
※ 今回の場合は、アクセスログ1行につき1件のため”件数”は1固定にしています。

sample_mapper.php

#!/usr/bin/php
<?php
while( ! feof(STDIN) ) {
  // ELBのアクセスログ1行分を標準入力から読込
  $line = trim(fgets(STDIN));
  if($line == '') continue;

  // スペースを区切り文字として、項目を分割して配列へ格納
  $values = explode(" ", $line);
  // 8項目目の「elb_status_code」をステータスコードとして"ステータスコード"<TAB>"1"として出力
  printf("%s\t1\n", $values[7]);
}
?>

reduce処理の実装

reduce処理では、map処理での出力が”ステータスコード”でソートされた状態で、標準入力に渡されてきます。
今回はステータスコード毎の件数を取得したいため、単純に件数分を合計する実装としています。

sample_reducer.php

#!/usr/bin/php
<?php

$current_code = null;
$total = 0;

while ( ! feof(STDIN) ) {
  // map処理の出力 "ステータスコード"<TAB>"1"
  $line = trim(fgets(STDIN));
  if($line == '') continue;

  list($code, $count) = explode("\t", $line);
  if ($current_code != null && $current_code != $code) {
    printf("%s\t%d\n", $current_code, $total);
    $current_code = $code;
    $total = $count;
  } else {
    $current_code = $code;
    $total = $total + $count;
  }
}
printf("%s\t%d\n", $current_code, $total);
?>

※ hadoop-streamingの動作イメージは第28回 RubyとHadoopで分散処理 Hadoop Streamingの仕組みに、わかりやすい解説がありますので、こちらをご参照ください。

動作確認

Hadoop Streamingで、MapReduce処理を実行してみます。

S3上に保存されたELBのアクセスログ(100ファイル、8,014,771行)を解析し、結果をHDFS上へ出力します。

$ hadoop                                                               \
 jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar                    \
 -files /tmp/elblogs/sample_mapper.php,/tmp/elblogs/sample_reducer.php \
 -input   s3n://hadoop-streaming/input                                 \
 -output  output/elblogs/php                                           \
 -mapper  sample_mapper.php                                            \
 -reducer sample_reducer.php

解析結果を確認します。

$ hadoop fs -ls  'output/elblogs/php/'
Found 2 items
-rw-r--r--   3 hdfs hadoop          0 2014-09-25 08:35 output/elblogs/php/_SUCCESS
-rw-r--r--   3 hdfs hadoop         44 2014-09-25 08:35 output/elblogs/php/part-00000
$ sudo -u hdfs hadoop fs -cat 'output/elblogs/php/part-*'
200     7998596
302     15629
400     4
403     291
404     251

おまけ

hadoopからAmazon S3を利用する

hadoopで使用するファイルシステムはHDFSに限定されず、Amazon S3も利用できるようになっています。
S3を利用するために、以下の設定を追加しています。

/etc/hadoop/conf/core-site.xml

<configuration>
  ...(省略)...
  <property>
    <name>fs.s3n.awsAccessKeyId</name>
    <value>${s3のアクセスキーID}</value>
  </property>
  <property>
    <name>fs.s3n.awsSecretAccessKey</name>
    <value>${s3のシークレットキー}</value>
  </property>
</configuration>

Javaで実装した場合とのパフォーマンスを検証してみる

timeコマンドよる処理時間の比較

PHP(Hadoop Streaming)の場合の処理時間

回数 real user sys
1 6m 0.508s 2m46.480s 2m29.841s
2 5m56.944s 2m48.870s 2m27.128s
3 5m48.787s 2m49.020s 2m28.295s
4 6m 1.281s 2m46.608s 2m29.689s
5 5m58.266s 2m47.257s 2m28.369s

Javaの場合の処理時間

回数 real user sys
1 3m19.946s 1m49.552s 0m20.004s
2 2m34.365s 1m45.980s 0m19.789s
3 2m48.140s 1m46.779s 0m19.896s
4 2m56.134s 1m48.744s 0m19.982s
5 2m36.993s 1m48.872s 0m19.816s

Javaでの実装

com.fancs.adn.StatusCodeCounter.java

package com.fancs.adn;

import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class StatusCodeCounter {

  public static void main(String[] args) {
   boolean result = false;
    try {
      // ジョブの設定
      Configuration conf = new Configuration();
      Job job = new Job(conf, "StatusCodeCounter");
      job.setJarByClass(StatusCodeCounter.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      // map処理を実装したクラスの指定
      job.setMapperClass(MapperForStatusCodeCounter.class);
      // reduce処理を実装したクラスを指定
      job.setReducerClass(ReducerForStatusCodeCounter.class);
      // map処理の入力元を引数から取得して指定する
      FileInputFormat.addInputPath(job, new Path(args[0]));
      // reudce処理の出力先を引数から取得して指定する
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
      // ジョブの実行を開始する
      result = job.waitForCompletion(true);
    } catch(Exception e) {
      e.printStackTrace();
    } finally {
      System.exit(result ? 0 : 1);
    }
  }

  public static class MapperForStatusCodeCounter extends
    Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
      // ELBのアクセスログ1行を文字列として取得
      String line = value.toString();
      if(StringUtils.isEmpty(line)) {
        return;
      }
      line = line.trim();
      // スペースを区切り文字として、各項目を配列へ格納
      String[] items = line.split("\\s");
      // 8項目目のELBのステータスコードをキーとして設定
      outKey.set(items[7]);
      // key : value = "ステータスコード" : 1 として出力
      context.write(outKey, outValue);
    }
  }

  public static class ReducerForStatusCodeCounter extends
    Reducer<Text, IntWritable, Text, IntWritable> {

    Text outKey = new Text();
    IntWritable outValue = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {

      // ステータスコードをキーとしてそのまま設定
      outKey.set(key);
      int count = 0;

      /*
       * ステータスコードに対する件数が、[1, 1, 1, ....]の形式で
       * 渡されてくるため、件数分イテレートして件数をカウント
       */
      Iterator<IntWritable> ite = values.iterator();
      while(ite.hasNext()) {
        int c = ite.next().get();
        count += c;
      }

      outValue.set(count);
      // key : value = "ステータスコード" : "合計"として出力
      context.write(outKey, outValue);
    }
  }
}

パッケージング(jarの生成)

hadoopで実行するため、Javaによる実装をjarファイルへパッケージングします。
上記のJavaのコンパイル及びパッケージングにApache Antを利用しました。

build.xml

<?xml version="1.0" encoding="UTF-8"?>
<project name="hadoop-streaming-sample" basedir="." default="package">

  <property name="project" value="hadoop-streaming-sample" />
  <property name="version" value="0.0.1" />
  <property name="src.dir" value="src/main/java" />
  <property name="dest.dir" value="target/classes" />

  <!-- クラスパスの設定 -->
  <path id="classpath">
    <fileset dir="/usr/lib/hadoop/client">
      <include name="*.jar" />
    </fileset>
    <pathelement location="/usr/share/java/commons-lang.jar" />
  </path>

  <!-- パッケージング(jarの生成) -->
  <target name="package" depends="compile">
    <jar destfile="${project}-${version}.jar" basedir="${dest.dir}" />
  </target>

  <!-- Javaのコンパイル -->
  <target name="compile" depends="clean">
    <mkdir dir="${dest.dir}" />
    <javac srcdir="${src.dir}" destdir="${dest.dir}">
      <classpath>
        <path refid="classpath" />
      </classpath>
    </javac>
  </target>

  <target name="clean">
    <delete dir="${dest.dir}" />
    <delete file="${project}-${version}.jar" />
  </target>
</project>

antでパッケージングを実行します。

$ ant
Buildfile: build.xml

clean:
   [delete] Deleting directory /path/to/projects/hadoop-streaming-sample/target/classes
   [delete] Deleting: /path/to/projects/hadoop-streaming-sample/hadoop-streaming-sample-0.0.1.jar

compile:
    [mkdir] Created dir: /path/to/projects/hadoop-streaming-sample/target/classes
    [javac] Compiling 2 source files to /path/to/projects/hadoop-streaming-sample/target/classes
    [javac] Note: Some input files use or override a deprecated API.
    [javac] Note: Recompile with -Xlint:deprecation for details.

package:
      [jar] Building jar: /path/to/projects/hadoop-streaming-sample/hadoop-streaming-sample-0.0.1.jar

BUILD SUCCESSFUL
Total time: 1 second

実行

$ hadoop jar hadoop-streaming-sample-0.0.1.jar \
    com.fancs.adn.StatusCodeCounter            \
    s3n://hadoop-streaming/input               \
    output/elblogs/java

Javaで実装した場合は、hadoop jarコマンドにパッケージングしたjarファイルを指定します。
そのあとの引数は、順に、ジョブの設定を記述したmainメソッドをもつクラス名、map処理に入力元、reduce処理の出力先を指定します。

所感

Hadoopというと、なぜかJavaで実装しなくてはというような思い込みがありmap,reduce処理を実装するのはちょっと…面倒という印象がありましたが、Hadoop Streamingを使用すれば、簡単なものであればシェルやコマンドで十分実装できてしまうので、お手軽に分散処理を実装したい場合は、アリだなと思いました。

今回の場合は、思っていた以上に処理時間の差がでてしまったのですが、全てがHadoop Streamingの仕組み上のオーバヘッドなのかどうかはソースコードを調べるなど今後の課題にしたいと思います。
オーバヘッドなしで利用したいのであれば、Javaで、ある程度のオーバヘッドが許容できるのであればHadoop Streamingを選択するなどの考え方ができそうです。

参考

Add a Comment

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です


*