MongoDBとHiveでごにょごにょ事始め


こんにちは、s_mamedaifukuです。
先頭の ”s” は「塩」の ”s” です。

MongoDBとHiveの連携を試みている今日この頃です。

各ソフトウェアのバージョンは
MongoDB : 2.6.1
Hadoop(EMR) : 1.0.3
Hive : 0.11.0.1
です。

HiveからMongoDBのデータを操作する

「MongoDB Java Driver 」と「MongoDB Connector for Hadoop」を使うことで、MongoDBをHadoopの入出力先とすることが出来ます。

今回は、mongo-java-driver-2.12.3.jar、mongo-hadoop-core-1.3.0.jarの2つ、そしてHiveを使用するためmongo-hadoop-hive-1.3.0.jarを加えた3つのJarファイルを予めダウンロードしS3へ置いておきます。

EMRを使う場合は、bootstrapスクリプトで3つのJarファイルを/home/haodoop/libへコピーします。
bootstrap.sh


#!/bin/sh

for JAR in mongo-java-driver-2.12.3.jar mongo-hadoop-core-1.3.0.jar mongo-hadoop-hive-1.3.0.jar
do
  if [ ! -f /home/hadoop/lib/$JAR ]; then
    hadoop fs -copyToLocal s3://<bukect_name>/lib/$JAR /home/hadoop/lib/
  fi
done

bootstrapスクリプトを指定して、EMRクラスタ立ち上げます。ここではインタラクティブモードで立ち上げています。

$ elastic-mapreduce \
--create \
--name mongodb_test \
--num-instances <num_instance> \
--instance-type <instance_type> \
--bootstrap-action s3://<bucket_name>/emr/script/bootstrap.sh \
--hive-versions 0.11.0.1 \
--hadoop-version 1.0.3 \
--hive-interactive \
--alive

MongoDBのコレクションに対応するテーブルをHive上に作成します。

hive > CREATE EXTERNAL TABLE users (
     >   h_name STRING,
     >   h_number INT,
     >   h_created TIMESTAMP
     > )
     > STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
     > WITH SERDEPROPERTIES('mongo.columns.mapping'='{"h_name":"name","h_number":"number","h_created":"created"}')
     > TBLPROPERTIES('mongo.uri'='mongodb://<mongodb_server>:27017/test.users');

カラム名に「h_」とプレフィックスを付けていますが、MongoDBのコレクションのフィールド名と揃えてしまっても問題ありません。
その場合は「 WITH SERDEPROPERTIES」の部分は必要なくなります。

ここでMongoDBにテストデータを保存してみると、

$ mongo --quiet test  << EOS 
> for (var i = 1; i<= 1000 ; i++) {
>   var user = {
>     name : "name" + i ,
>     number :  i ,
>     created : new Date()
>   };
>
>   db.users.save(user);
> }
> EOS

Hive上からも確認できるようになりました。

hive> SELECT * FROM users;
OK
name1   1       2014-07-28 10:18:19.764
name2   2       2014-07-28 10:18:19.771
name3   3       2014-07-28 10:18:19.772
name4   4       2014-07-28 10:18:19.773
name5   5       2014-07-28 10:18:19.773
...

MongoDBのISODate型とHiveのTIMESTAMP型が対応するようになっています。
ログやHive上で時刻をJSTで扱っている場合などは注意が必要です。

以上でMongoDBのデータをHiveから操作できるようになりました。

S3上のログデータをMongoDBへ保存する

実運用ではfluent-plugin-mongoを使い、MongoDBへログデータを流し込んでいますが、
fluent-plugin-s3でS3に蓄積しておいたログにHiveで処理を加え、MongoDBへ保存するというのもありがちなパターンです。

元になるログデータはこんな感じです

2014-07-25T18:00:00+09:00       data.nend.conversion    {"name":"name1","number":1,"address":"address1","age":10,"created":"2014-07-25 18:00:00"}
2014-07-25T18:00:10+09:00       data.nend.conversion    {"name":"name2","number":2,"address":"address2","age":20,"created":"2014-07-25 18:00:10"}
2014-07-25T18:00:20+09:00       data.nend.conversion    {"name":"name3","number":3,"address":"address3","age":30,"created":"2014-07-25 18:00:20"}
2014-07-25T18:00:30+09:00       data.nend.conversion    {"name":"name4","number":4,"address":"address4","age":40,"created":"2014-07-25 18:00:30"}
2014-07-25T18:00:40+09:00       data.nend.conversion    {"name":"name5","number":5,"address":"address5","age":50,"created":"2014-07-25 18:00:40"}
...

S3上のログをHiveから読み込むために外部テーブルを作成します。
1時間毎にパスが分かれていることを想定して、テーブルでも1時間毎にパーティションを作ってみます。

CREATE EXTERNAL TABLE IF NOT EXISTS user_logs (
  time STRING,
  tag STRING,
  record STRING
)
PARTITIONED BY (
  dt STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n';

処理対象時間のパーティションを作成すると、Hiveからログを認識できるようになります。

ALTER TABLE user_logs
ADD IF NOT EXISTS PARTITION ( dt = "2014072518" )
LOCATION "s3://<bucket_name>/log/user/2014/07/25/18";

実際はHive上でもっとごにょごにょ感のある処理をしそうなものですが、
ここでは簡単に、ログデータのJSONを読み込んで必要なデータをusersテーブルにINSERTします。
MongoDBと関連付けられたHiveのテーブルに対してはINSERT OVERWRITEをしてもINSERT INTOの挙動となるようです。

INSERT INTO TABLE users
SELECT
  j.name,
  j.number,
  j.created
FROM user_logs
LATERAL VIEW json_tuple(
  user_logs.record,
  'name',
  'number',
  'created'
) j
AS
  name,
  number,
  created
WHERE dt = "2014072518";

実行後、MongoDBからデータの存在を確認できます。

mongo> db.users.find()
{ "_id" : ObjectId("53d62f9ce4b047e6d515d5fe"), "name" : "name1", "number" : 1, "created" : ISODate("2014-07-25T09:00:00Z") }
{ "_id" : ObjectId("53d62f9ce4b047e6d515d5ff"), "name" : "name2", "number" : 2, "created" : ISODate("2014-07-25T09:00:10Z") }
{ "_id" : ObjectId("53d62f9ce4b047e6d515d600"), "name" : "name3", "number" : 3, "created" : ISODate("2014-07-25T09:00:20Z") }
{ "_id" : ObjectId("53d62f9ce4b047e6d515d601"), "name" : "name4", "number" : 4, "created" : ISODate("2014-07-25T09:00:30Z") }
{ "_id" : ObjectId("53d62f9ce4b047e6d515d602"), "name" : "name5", "number" : 5, "created" : ISODate("2014-07-25T09:00:40Z") }
...

データ量を増やしていった時のパフォーマンスや
MongoDBに作成してあるインデックスのHive上での振る舞いなど
興味深い点はまだまだたくさんありますね。今後の研究課題としたいと思います。

Add a Comment

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です


*