2015年1月2日金曜日

CouchDBのMapReduceをアプリ(python)で動かす

前回、Futon上でMapReduceを定義、動かしてみました。サンプル的にDBの集計をしたい場合はそれでいいんですが、やはりアプリ(python)からMapReduce処理を指示し、集計処理をしてみたいです。

とりあえず前回、日付毎の集計をするMapReduceを”eachdate”に作成しましたので、これをpythonからCallしてみます。(Keyを指定せずに、全部を対象にしています)

  1. # coding: utf-8
  2. import couchdb
  3.  
  4. server = couchdb.Server('http://localhost:5984')
  5. db = server['kakeibo']
  6.  
  7. result = db.view("application/eachdate")
  8.  
  9. for r in result:
  10. print r

ところがこれを実行すると以下の結果になってしまいました。

&lt Row key="None," value="2850">

なんかkeys(日付)毎に集計されるのを期待したのですが、keys無視で集計されています。色々、調べまくってみると、どうもデフォルトではreduce関数の入力はMapの結果だけではなく、Reduceの結果も再度受け入れてしまうようです。(そのため、全部集計してしまう:注)これを期待どおり、keys毎に集計したい場合は以下のOptionを入れるそうです。

  1. # coding: utf-8
  2. import couchdb
  3.  
  4. server = couchdb.Server('http://localhost:5984')
  5. db = server['kakeibo']
  6.  
  7. result = db.view("application/eachdate", group=True)
  8.  
  9. for r in result:
  10. print r

こうすると、期待どおり日付毎の集計をしてくれます。

&lt Row key=u'5/10', value=1300>
&lt Row key=u'5/11', value=650>
&lt Row key=u'5/12', value=900>


(ちなみに、ここにはReduceを行わないoptionとして'reduce=False'というのもあります)

注:Reduce関数の出力が再度Reduce関数の入力に入ってしまう、というのは説明が足りてませんでしたね。(元が英語だったので、意図を解釈しきれてなかった)これは、Mapの結果が大きいと、Map関数が入力データをsplitして複数のMap結果が生成されるそうです。(具体的にどのくらい大きいとsplitされるのかはまだ調べてません。)そうすると、各Map結果に対してReduce関数が動き、最後にそれらのReduce関数の結果をまとめるため、もう一度Reduce関数がCallされるんだそうです。同じReduce関数がCallされるので、そのままだと再集計をしてしまう、ということが起きるようです。それを避けるため、rereduceというflagがあり、Reduce関数内ではそのflagでsplitされたMap関数を処理するReduce関数なのか、最後に集計するReduce関数なのかをif文で識別して動作する、という方法を使うそうです。ここで述べたgroupとsplit、rereduceの関係はまた調べて整理しないとダメですね。

さて、MapReduceのコードをFutonで入力するのもいいですが、python上でコーディングしてしまう方法もあります。

  1. # coding: utf-8
  2. import couchdb
  3.  
  4. server = couchdb.Server('http://localhost:5984')
  5. db = server['kakeibo']
  6.  
  7. map_fun = '''function(doc) {
  8. emit(doc.date, doc.price);
  9. }'''
  10.  
  11. reduce_fun = '''function(keys, values) {
  12. return sum(values);
  13. }'''
  14.  
  15. result = db.query(map_fun, reduce_fun, group=True)
  16.  
  17. for r in result:
  18. print r

何のことはありません、関数定義を文字列として渡しているだけですね。
後、これらの結果を見るとreduce定義の引数のところで(keys, values)としましたが、特に(key, value)でもいいようです。まあわかれば当たり前の話ですが、関数定義の時の引数名なんで、順番が大事なだけですね。


PS:
最後に、ログのとり方についてだけ追記しておきます。特に今回悩んだような症状のとき、Reduce関数には何が引数に入っているか知りたくなります。そんな時、ログ関数があります。

  1. reduce_fun = '''function(keys, values, rereduce) {
  2. log(rereduce);
  3. log(keys);
  4. log(values);
  5. if (rereduce)
  6. return sum(values);
  7. else
  8. return values;
  9. }''

こんなふうにlog()を入れるだけで、/var/log/couchdb/couch.logに記録されます。これを見ると、group=Trueがある時とない時の違いがひと目でわかります。
groupの指定をしていないと以下のログが記録されます。

  1. [Sat, 03 Jan 2015 07:31:45 GMT] [info] [<0 data-blogger-escaped-.290.0="">] OS Process #Port<0 data-blogger-escaped-.2666=""> Log :: false
  2. [Sat, 03 Jan 2015 07:31:45 GMT] [info] [<0 data-blogger-escaped-.290.0="">] OS Process #Port<0 data-blogger-escaped-.2666=""> Log :: [["5/10","102d4440a25efcd79349859b67000302"],["5/10","102d4440a25efcd79349859b67000a91"],["5/11","102d4440a25efcd79349859b67000eee"],["5/11","102d4440a25efcd79349859b67001c9b"],["5/12","102d4440a25efcd79349859b67002b7f"]]
  3. [Sat, 03 Jan 2015 07:31:45 GMT] [info] [<0 data-blogger-escaped-.290.0="">] OS Process #Port<0 data-blogger-escaped-.2666=""> Log :: [300,1000,400,250,900]


id情報まで表示されるので見難いのですが、Map関数を通しても、すべての日付のリストがkeysに入っており、対応するpriceもvaluesにリストで5個集められてしまっていることがわかります。(これでは、日付毎の集計なんてされないはずだ)
一方、group=Trueの指定をすると以下のログになりました。

  1. [Sat, 03 Jan 2015 07:31:45 GMT] [info] [<0 data-blogger-escaped-.290.0="">] OS Process #Port<0 data-blogger-escaped-.2666=""> Log :: false
  2. [Sat, 03 Jan 2015 07:31:45 GMT] [info] [<0 data-blogger-escaped-.290.0="">] OS Process #Port<0 data-blogger-escaped-.2666=""> Log :: [["5/10","102d4440a25efcd79349859b67000a91"],["5/10","102d4440a25efcd79349859b67000302"]]
  3. [Sat, 03 Jan 2015 07:31:45 GMT] [info] [<0 data-blogger-escaped-.290.0="">] OS Process #Port<0 data-blogger-escaped-.2666=""> Log :: [1000,300]
  4. [Sat, 03 Jan 2015 07:31:45 GMT] [info] [<0 data-blogger-escaped-.109.0="">] 127.0.0.1 - - POST /kakeibo/_temp_view?group=true 200
  5. [Sat, 03 Jan 2015 07:31:45 GMT] [info] [<0 data-blogger-escaped-.290.0="">] OS Process #Port<0 data-blogger-escaped-.2666=""> Log :: false
  6. [Sat, 03 Jan 2015 07:31:45 GMT] [info] [<0 data-blogger-escaped-.290.0="">] OS Process #Port<0 data-blogger-escaped-.2666=""> Log :: [["5/11","102d4440a25efcd79349859b67001c9b"],["5/11","102d4440a25efcd79349859b67000eee"]]
  7. [Sat, 03 Jan 2015 07:31:45 GMT] [info] [<0 data-blogger-escaped-.290.0="">] OS Process #Port<0 data-blogger-escaped-.2666=""> Log :: [250,400]
  8. [Sat, 03 Jan 2015 07:31:45 GMT] [info] [<0 data-blogger-escaped-.290.0="">] OS Process #Port<0 data-blogger-escaped-.2666=""> Log :: false
  9. [Sat, 03 Jan 2015 07:31:45 GMT] [info] [<0 data-blogger-escaped-.290.0="">] OS Process #Port<0 data-blogger-escaped-.2666=""> Log :: [["5/12","102d4440a25efcd79349859b67002b7f"]]
  10. [Sat, 03 Jan 2015 07:31:45 GMT] [info] [<0 data-blogger-escaped-.290.0="">] OS Process #Port<0 data-blogger-escaped-.2666=""> Log :: [900]


今度は、一回のreduce関数がcallされる時にはkeysには日付ごとのリストが集められていることがわかります。(つまり3回reduce関数はcallされています)これではっきりと、想定した動作をなぜしなかったのかがわかります。

0 件のコメント:

コメントを投稿