Apache Storm Trident Tutorial まとめ
改めてStorm Trident tutorialを読み直してみたのでまとめてみた.
誤訳あるかもしれないので信頼はしないでください^^;
(報告していただけると喜びます)
それと,上記ページの日本語訳というよりは,自分がメモしておきたかった部分をまとめたものになっています.
(上記ページの内容を網羅しているわけではありません.機会があればまとめます.)
Tridentとは
- Storm上でリアルタイムコンピューティングを行うためのハイレベルの抽象化.
- 高スループット(秒間100万メッセージ)と,状態を持つストリーム処理の結合.
- TridentのコンセプトはPigやCascadingに似ている.
- 次のような機能を持つ:joins, aggregations, grouping, functions, filters
- 状態をインクリメンタルに変更するような処理を行うためのプリミティブ(基本的な命令)を提供(データベースとも連携).
- 同じバッチを重複して処理しない(1回だけ処理する)ことを保証.
Components
Spout
- 指定されたデータソースからデータを読み込み,ストリームに流し続す.
- 通常は,KestrelやKafkaなどのqueue brokerからデータを取得し,ストリームに流す.
- Tridentでは,ストリームを小さなバッチとして処理する.この例のspoutでは3文毎のバッチにしているが,通常はやってくるデータ量に応じて1000~1000000ぐらいのデータを1つのバッチにするらしい.
Operation
- Tridentではspoutからやってきたバッチに対する処理を行えるAPIを提供.それらのAPIは,Hadoopを抽象化したものであるPigやCascadingに類似している.
- バッチ毎に,joins, aggregations, run functions, run filtersなどが行える.
- バッチを集めて処理することや,その結果を永続層に保存することも可能.
最適化
Tridentでは,以下の2つのことが自動的に行われる.
- 状態の読み書きを行うOperation (persistentAggregateやstateQueryなど) は自動的にバッチで行われる.例えば20個のリクエストが合った場合に,1つずつ処理すると時間が掛かるが,まとめて1回のリクエストで済ませてしまうことで,時間を短縮できる.
- ネットワーク上にtupleを流す前に,可能な限りそのノード上で処理を行ってしまう.(MapReduceのCombinerのような処理を自動的に行なってくれる→
CombinerAggregator
インタフェース)
※以下2つの例のソースコードはGitHubからDLできます.
storm/examples/storm-starter at master · apache/storm · GitHub
例1:WordCount
この例では2つのことを行う.
- ストリームから文を受け取り,単語を数え上げる(wordcount).
- 単語リストからカウント合計を得るためにクエリを実行する.
1の処理と2の処理は別々のストリームで実行される.
1の処理ストリームの定義
TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .parallelismHint(6);
TridentState
は,persistentAggregate
した結果を状態(state)として保持できる.
2の処理ストリームの定義
topology.newDRPCStream("words") .each(new Fields("args"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
外部からDRPCClientを起動することで,このストリームにクエリを投げられる.
この例では,例えば「cat dog the man」というクエリを投げると,TridentState
に保存された「cat」「dog」「the」「man」のカウントを合計した値を返してくれる.
DRPCClientを起動してクエリを投げるには,以下のjavaプログラムを実行すれば良い.
DRPCClientTest.java
import backtype.storm.utils.DRPCClient; public class DRPCClientTest { // DRPCサーバーのホスト名 or IPアドレス static final String DRPC_SERVER_LOCATION = "hogehoge.com"; // DRPCサーバーのポート番号 static final int PORT = 3772; public static void main(String[] args) throws Exception { // DRPCClientの起動 DRPCClient client = new DRPCClient(DRPC_SERVER_LOCATION, PORT); // wordsというストリームにクエリ「cat dog the man」を投げる. System.out.println(client.execute("words", "cat dog the man")); // [[500]]のような形式でカウントが標準出力に表示されるはず. } }
例2:Reach
要求されたURLのReachを算出するTopology.
ここでいうReachとは,Twitter上でそのURLを見た(URLを含むTweetを見た)ユニークユーザーの数である.
つまり,Aさんがfoo.comというURL付きのTweetをしたとする.この時,Aさんのフォロワーは50人いるとすれば,Reachは50となる.
さらに,フォロワー数100人のBさんもfoo.comというURL付きのTweetをした.なお,AさんとBさんの共通のフォロワーが10人いる.この時,Reachは50+100-10=140となる.
Reachでは2つの状態ストリーム(TridentState
)と1つのDRPCストリーム(DRPCStream
)を使用する.
TridentState urlToTweeters
URL毎にそのURLを含むTweetを行ったユーザを保持.
TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers
ユーザ毎にフォロワーの一覧を保持.
TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState());
DRPCStream
ここにURLを投げると,そのURLのReachの値を返してくれる.
topology.newDRPCStream("reach") // urlToTweetersを取得 .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")) // Listが返ってくるので,要素を1つ1つTupleに分解 .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) // 各Tupleを均一に各Workerに分散 .shuffle() // tweetersToFollowersを取得 .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) // 直前の処理(stateQuery)は200並列で行う .parallelismHint(200) // Listが返ってくるので,要素を1つ1つTupleに分解 .each(new Fields("followers"), new ExpandList(), new Fields("follower")) // フォロワー毎にグルーピング .groupBy(new Fields("follower")) // 各グループごとにTupleを1つだけ送る(共通のフォロワーの排除) .aggregate(new One(), new Fields("one")) // 直前の処理(stateQuery)は20並列で行う .parallelismHint(20) // 上記aggregateから送られてきたTupleの合計数をカウント. .aggregate(new Count(), new Fields("reach"));
ちなみに,One
というクラスはCombinerAggregator
インタフェースを実装しており,MapReduceのCombinerとほぼ同等の機能を持つ(自動的にCombine処理が行われる).
public class One implements CombinerAggregator<Integer> { public Integer init(TridentTuple tuple) { return 1; } public Integer combine(Integer val1, Integer val2) { return 1; } public Integer zero() { return 1; } }
FieldとTupleについて
Tridentでは基本的に各Operation毎に出力するTupleのFieldの変更はできない.
ただし,以下に限って(?)可能.
function
:入力Tupleへのfieldの追加aggregate
:fieldの置換(入力Tupleは破棄される)
Stateについて
重複処理をしないための状態管理方法について書かれている.
各バッチにはtransaction idが割り振られており,これで重複の区別を行う.
TridentとSpout/Boltの関係
Tridentは,自動的に最も効率よく処理できるようなSpout/Boltに内部で変換しているらしい.
具体的にはネットワーク通信の必要な処理(groupBy
, shuffle
, partitionBy
など)で区切って,1つのBoltに変換しているらしい.
Tutorialの画像を拝借して例を挙げると,上の画像のようなTridentの構成なら,下の画像のようなSpout/Boltに変換されるらしい. ↓
何のまとまりもないまとめになってしまいましたが,要望あれば改善しますm(__)m