IK.AM

@making's tech note


Twitterのリアルタイムデータ処理技術のStormをとにかく動かしてみる

🗃 {Middleware/DistributedSystem/Storm}
🗓 Updated at 2011-11-26T08:49:21Z  🗓 Created at 2011-11-26T08:49:21Z   🌎 English Page

Stormをとにかく動かす。

Stormって何?とかはまた今度。こことかこことかここを読んで。

今回は学習用のstorm-starterを動かす。

storm-starterプロジェクトを取得

ビルドツールにはClojureのleiningenを使用。

$ git clone https://github.com/nathanmarz/storm-starter.git
$ lein deps # 依存ライブラリの取得

ローカルで試す

Java版

ビルド&実行。

$ lein compile
$ java -cp `lein classpath` storm.starter.WordCountTopology

なんかwordcountっぽいログが出ています(長いので略)。 出力結果の意味はまた今度。

Clojure版

ローカルで動かす用にsrc/clj/storm/starter/clj/word_count.cljのmain関数を変更しています。

;; (defn -main [name]
;;   (StormSubmitter/submitTopology
;;    name 
;;    {TOPOLOGY-DEBUG true
;;     TOPOLOGY-WORKERS 3}
;;    (mk-topology))) 

(defn -main [& args]
  (run-local!))

ビルド&実行!

$ lein compile
$ java -cp `lein classpath` storm.starter.clj.word_count

Java版と同じログが出ます(長いので略)。

WordCountソース

Java版

Spout実装クラスは略。

package storm.starter;

import storm.starter.spout.RandomSentenceSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;

/**
 * This topology demonstrates Storm's stream groupings and multilang capabilities.
 */
public class WordCountTopology {
    public static class SplitSentence extends ShellBolt implements IRichBolt {
        
        public SplitSentence() {
            super("python", "splitsentence.py");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }  
    
    public static class WordCount implements IBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();

        @Override
        public void prepare(Map conf, TopologyContext context) {
        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if(count==null) count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }

        @Override
        public void cleanup() {
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }
    
    public static void main(String[] args) throws Exception {
        
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout(1, new RandomSentenceSpout(), 5);
        
        builder.setBolt(2, new SplitSentence(), 8)
                 .shuffleGrouping(1);
        builder.setBolt(3, new WordCount(), 12)
                 .fieldsGrouping(2, new Fields("word"));

        Config conf = new Config();
        conf.setDebug(true);

        
        if(args!=null && args.length > 0) {
            conf.setNumWorkers(3);
            
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {        
            conf.setMaxTaskParallelism(3);

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
            Thread.sleep(10000);

            cluster.shutdown();
        }
    }
}

Clojure版

DSLって感じですね!

(ns storm.starter.clj.word-count
  (:import [backtype.storm StormSubmitter LocalCluster])
  (:use [backtype.storm clojure config])
  (:gen-class))

(defspout sentence-spout ["sentence"]
  [conf context collector]
  (let [sentences ["a little brown dog"
                   "the man petted the dog"
                   "four score and seven years ago"
                   "an apple a day keeps the doctor away"]]
    (spout
     (nextTuple []
       (Thread/sleep 100)
       (emit-spout! collector [(rand-nth sentences)])         
       )
     (ack [id]
        ;; You only need to define this method for reliable spouts
        ;; (such as one that reads off of a queue like Kestrel)
        ;; This is an unreliable spout, so it does nothing here
        ))))

(defspout sentence-spout-parameterized ["word"] {:params [sentences] :prepare false}
  [collector]
  (Thread/sleep 500)
  (emit-spout! collector [(rand-nth sentences)]))

(defbolt split-sentence ["word"] [tuple collector]
  (let [words (.split (.getString tuple 0) " ")]
    (doseq [w words]
      (emit-bolt! collector [w] :anchor tuple))
    (ack! collector tuple)
    ))

(defbolt word-count ["word" "count"] {:prepare true}
  [conf context collector]
  (let [counts (atom {})]
    (bolt
     (execute [tuple]
       (let [word (.getString tuple 0)]
         (swap! counts (partial merge-with +) {word 1})
         (emit-bolt! collector [word (@counts word)] :anchor tuple)
         (ack! collector tuple)
         )))))

(defn mk-topology []

  (topology
   {1 (spout-spec sentence-spout)
    2 (spout-spec (sentence-spout-parameterized
                   ["the cat jumped over the door"
                    "greetings from a faraway land"])
                  :p 2)}
   {3 (bolt-spec {1 :shuffle 2 :shuffle}
                 split-sentence
                 :p 5)
    4 (bolt-spec {3 ["word"]}
                 word-count
                 :p 6)}))

(defn run-local! []
  (let [cluster (LocalCluster.)]
    (.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))
    (Thread/sleep 10000)
    (.shutdown cluster)
    ))

;; (defn -main [name]
;;   (StormSubmitter/submitTopology
;;    name
;;    {TOPOLOGY-DEBUG true
;;     TOPOLOGY-WORKERS 3}
;;    (mk-topology)))

(defn -main [& args]
  (run-local!))

クラスタで試す

いつか!


✒️️ Edit  ⏰ History  🗑 Delete