Stormをとにかく動かす。
Stormって何?とかはまた今度。こことかこことかここを読んで。
今回は学習用のstorm-starterを動かす。
storm-starterプロジェクトを取得
ビルドツールにはClojureのleiningenを使用。
$ git clone https://github.com/nathanmarz/storm-starter.git
$ lein deps # 依存ライブラリの取得
ローカルで試す
Java版
ビルド&実行。
$ lein compile
$ java -cp `lein classpath` storm.starter.WordCountTopology
なんかwordcountっぽいログが出ています(長いので略)。 出力結果の意味はまた今度。
Clojure版
ローカルで動かす用にsrc/clj/storm/starter/clj/word_count.clj
のmain関数を変更しています。
;; (defn -main [name]
;; (StormSubmitter/submitTopology
;; name
;; {TOPOLOGY-DEBUG true
;; TOPOLOGY-WORKERS 3}
;; (mk-topology)))
(defn -main [& args]
(run-local!))
ビルド&実行!
$ lein compile
$ java -cp `lein classpath` storm.starter.clj.word_count
Java版と同じログが出ます(長いので略)。
WordCountソース
Java版
Spout実装クラスは略。
package storm.starter;
import storm.starter.spout.RandomSentenceSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
/**
* This topology demonstrates Storm's stream groupings and multilang capabilities.
*/
public class WordCountTopology {
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static class WordCount implements IBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void prepare(Map conf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if(count==null) count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new RandomSentenceSpout(), 5);
builder.setBolt(2, new SplitSentence(), 8)
.shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 12)
.fieldsGrouping(2, new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if(args!=null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
Clojure版
DSLって感じですね!
(ns storm.starter.clj.word-count
(:import [backtype.storm StormSubmitter LocalCluster])
(:use [backtype.storm clojure config])
(:gen-class))
(defspout sentence-spout ["sentence"]
[conf context collector]
(let [sentences ["a little brown dog"
"the man petted the dog"
"four score and seven years ago"
"an apple a day keeps the doctor away"]]
(spout
(nextTuple []
(Thread/sleep 100)
(emit-spout! collector [(rand-nth sentences)])
)
(ack [id]
;; You only need to define this method for reliable spouts
;; (such as one that reads off of a queue like Kestrel)
;; This is an unreliable spout, so it does nothing here
))))
(defspout sentence-spout-parameterized ["word"] {:params [sentences] :prepare false}
[collector]
(Thread/sleep 500)
(emit-spout! collector [(rand-nth sentences)]))
(defbolt split-sentence ["word"] [tuple collector]
(let [words (.split (.getString tuple 0) " ")]
(doseq [w words]
(emit-bolt! collector [w] :anchor tuple))
(ack! collector tuple)
))
(defbolt word-count ["word" "count"] {:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(execute [tuple]
(let [word (.getString tuple 0)]
(swap! counts (partial merge-with +) {word 1})
(emit-bolt! collector [word (@counts word)] :anchor tuple)
(ack! collector tuple)
)))))
(defn mk-topology []
(topology
{1 (spout-spec sentence-spout)
2 (spout-spec (sentence-spout-parameterized
["the cat jumped over the door"
"greetings from a faraway land"])
:p 2)}
{3 (bolt-spec {1 :shuffle 2 :shuffle}
split-sentence
:p 5)
4 (bolt-spec {3 ["word"]}
word-count
:p 6)}))
(defn run-local! []
(let [cluster (LocalCluster.)]
(.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep 10000)
(.shutdown cluster)
))
;; (defn -main [name]
;; (StormSubmitter/submitTopology
;; name
;; {TOPOLOGY-DEBUG true
;; TOPOLOGY-WORKERS 3}
;; (mk-topology)))
(defn -main [& args]
(run-local!))
クラスタで試す
いつか!