---
title: Twitterのリアルタイムデータ処理技術のStormをとにかく動かしてみる
tags: []
categories: ["Middleware", "DistributedSystem", "Storm"]
date: 2011-11-26T08:49:21Z
updated: 2011-11-26T08:49:21Z
---

Stormをとにかく動かす。

Stormって何？とかはまた今度。[ここ][1]とか[ここ][2]とか[ここ][3]を読んで。

今回は学習用の[storm-starter][4]を動かす。

### storm-starterプロジェクトを取得

ビルドツールにはClojureのleiningenを使用。

    $ git clone https://github.com/nathanmarz/storm-starter.git
    $ lein deps # 依存ライブラリの取得

### ローカルで試す
#### Java版

ビルド&実行。

    $ lein compile
    $ java -cp `lein classpath` storm.starter.WordCountTopology

なんかwordcountっぽいログが出ています（長いので略）。
出力結果の意味はまた今度。


#### Clojure版

ローカルで動かす用に`src/clj/storm/starter/clj/word_count.clj`のmain関数を変更しています。

    ;; (defn -main [name]
    ;;   (StormSubmitter/submitTopology
    ;;    name 
    ;;    {TOPOLOGY-DEBUG true
    ;;     TOPOLOGY-WORKERS 3}
    ;;    (mk-topology))) 

    (defn -main [& args]
      (run-local!))


ビルド&実行！

    $ lein compile
    $ java -cp `lein classpath` storm.starter.clj.word_count

Java版と同じログが出ます（長いので略）。



### WordCountソース
#### Java版
Spout実装クラスは略。

    package storm.starter;
    
    import storm.starter.spout.RandomSentenceSpout;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.task.ShellBolt;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.IBasicBolt;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * This topology demonstrates Storm's stream groupings and multilang capabilities.
     */
    public class WordCountTopology {
        public static class SplitSentence extends ShellBolt implements IRichBolt {
            
            public SplitSentence() {
                super("python", "splitsentence.py");
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
            }
        }  
        
        public static class WordCount implements IBasicBolt {
            Map<String, Integer> counts = new HashMap<String, Integer>();
    
            @Override
            public void prepare(Map conf, TopologyContext context) {
            }
    
            @Override
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                String word = tuple.getString(0);
                Integer count = counts.get(word);
                if(count==null) count = 0;
                count++;
                counts.put(word, count);
                collector.emit(new Values(word, count));
            }
    
            @Override
            public void cleanup() {
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word", "count"));
            }
        }
        
        public static void main(String[] args) throws Exception {
            
            TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout(1, new RandomSentenceSpout(), 5);
            
            builder.setBolt(2, new SplitSentence(), 8)
                     .shuffleGrouping(1);
            builder.setBolt(3, new WordCount(), 12)
                     .fieldsGrouping(2, new Fields("word"));
    
            Config conf = new Config();
            conf.setDebug(true);
    
            
            if(args!=null && args.length > 0) {
                conf.setNumWorkers(3);
                
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {        
                conf.setMaxTaskParallelism(3);
    
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("word-count", conf, builder.createTopology());
                Thread.sleep(10000);
    
                cluster.shutdown();
            }
        }
    }


#### Clojure版

DSLって感じですね！

    (ns storm.starter.clj.word-count
      (:import [backtype.storm StormSubmitter LocalCluster])
      (:use [backtype.storm clojure config])
      (:gen-class))
    
    (defspout sentence-spout ["sentence"]
      [conf context collector]
      (let [sentences ["a little brown dog"
                       "the man petted the dog"
                       "four score and seven years ago"
                       "an apple a day keeps the doctor away"]]
        (spout
         (nextTuple []
           (Thread/sleep 100)
           (emit-spout! collector [(rand-nth sentences)])         
           )
         (ack [id]
            ;; You only need to define this method for reliable spouts
            ;; (such as one that reads off of a queue like Kestrel)
            ;; This is an unreliable spout, so it does nothing here
            ))))
    
    (defspout sentence-spout-parameterized ["word"] {:params [sentences] :prepare false}
      [collector]
      (Thread/sleep 500)
      (emit-spout! collector [(rand-nth sentences)]))
    
    (defbolt split-sentence ["word"] [tuple collector]
      (let [words (.split (.getString tuple 0) " ")]
        (doseq [w words]
          (emit-bolt! collector [w] :anchor tuple))
        (ack! collector tuple)
        ))
    
    (defbolt word-count ["word" "count"] {:prepare true}
      [conf context collector]
      (let [counts (atom {})]
        (bolt
         (execute [tuple]
           (let [word (.getString tuple 0)]
             (swap! counts (partial merge-with +) {word 1})
             (emit-bolt! collector [word (@counts word)] :anchor tuple)
             (ack! collector tuple)
             )))))
    
    (defn mk-topology []
    
      (topology
       {1 (spout-spec sentence-spout)
        2 (spout-spec (sentence-spout-parameterized
                       ["the cat jumped over the door"
                        "greetings from a faraway land"])
                      :p 2)}
       {3 (bolt-spec {1 :shuffle 2 :shuffle}
                     split-sentence
                     :p 5)
        4 (bolt-spec {3 ["word"]}
                     word-count
                     :p 6)}))
    
    (defn run-local! []
      (let [cluster (LocalCluster.)]
        (.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))
        (Thread/sleep 10000)
        (.shutdown cluster)
        ))
    
    ;; (defn -main [name]
    ;;   (StormSubmitter/submitTopology
    ;;    name
    ;;    {TOPOLOGY-DEBUG true
    ;;     TOPOLOGY-WORKERS 3}
    ;;    (mk-topology)))
    
    (defn -main [& args]
      (run-local!))


### クラスタで試す

いつか！

  [1]: https://github.com/nathanmarz/storm
  [2]: http://engineering.twitter.com/2011/08/storm-is-coming-more-details-and-plans.html
  [3]: http://www.infoq.com/jp/news/2011/09/twitter-storm-real-time-hadoop
  [4]: https://github.com/nathanmarz/storm-starter
