Tridentの並列数の指定方法
Tridentの並列数の指定方法についてまとめました.
Tridentでは,ストリームを定義する時にparallelismHint()
メソッドを書くことで,直前のOperationの並列数の指定が可能.
ただし,単に記述しただけではダメみたいです.
例えば,以下の様なストリームを定義します.
Stream inputStream = topology .newStream("sentence", spout) .each(new Fields("field"), new Function1()) .each(new Fields("field"), new Function2(), new Fields()) .parallelismHint(10);
これを実行すると,Function2()
が10並列になるかと思いきや,されません.
しかし,以下のようにFunction1()
とFunction2()
の間にsuffle()
を挟むと,Function2()が10並列で動くようになります.
Stream inputStream = topology .newStream("sentence", spout) .each(new Fields("field"), new Function1()) .shuffle() .each(new Fields("field"), new Function2(), new Fields()) .parallelismHint(10);
以前書いた記事「Apache Storm Trident Tutorial まとめ - しろかい!」に書いてあるように,Tridentでは実行時に自動的にSpout/Boltに各Operationが変換されます.その際,.shuffle()
やgroupBy()
と言った,ネットワーク通信が必須となる(Tupleを別のノードに送る)操作で区切って,Boltを生成します.
前者のソースコードでは,Function1()
とFunction2()
は1つのBoltにまとめられてしまい,またSpoutと同一ノードで処理可能であるため,並列化されません(Spoutは1個なので,その数に合わせられてしまう).
一方,後者のソースコードでは.shuffle()
はネットワーク通信が必須であるため,Function1()
とFunction2()
は別々のBoltとして生成されます.その際, parallelismHint(10)
を指定しているので,10個に並列されます.
ちなみに,以下のように書くと,Function1()
とFunction2()
がまとめられたBoltが10個生成されます.
Stream inputStream = topology .newStream("sentence", spout) .shuffle() .each(new Fields("field"), new Function1()) .each(new Fields("field"), new Function2(), new Fields()) .parallelismHint(10);
さらにもう1つ.
以下のcodeを実行すると,Function1()
のBoltは10個,Function2()
のBoltは1個生成されます.
Stream inputStream = topology .newStream("sentence", spout) .shuffle() .each(new Fields("field"), new Function1()) .parallelismHint(10); .shuffle() .each(new Fields("field"), new Function2(), new Fields()) .parallelismHint(1);
以上,Tridentでの並列数の指定方法でした.