しろかい!

アプリ開発や機械学習などの開発Tips.

Tridentの並列数の指定方法

Tridentの並列数の指定方法についてまとめました.

Tridentでは,ストリームを定義する時にparallelismHint()メソッドを書くことで,直前のOperationの並列数の指定が可能.
ただし,単に記述しただけではダメみたいです.

例えば,以下の様なストリームを定義します.

Stream inputStream = topology
    .newStream("sentence", spout)
    .each(new Fields("field"), new Function1())
    .each(new Fields("field"), new Function2(), new Fields())
    .parallelismHint(10);

これを実行すると,Function2()が10並列になるかと思いきや,されません.
しかし,以下のようにFunction1()Function2()の間にsuffle()を挟むと,Function2()が10並列で動くようになります.

Stream inputStream = topology
    .newStream("sentence", spout)
    .each(new Fields("field"), new Function1())
    .shuffle()
    .each(new Fields("field"), new Function2(), new Fields())
    .parallelismHint(10);

以前書いた記事「Apache Storm Trident Tutorial まとめ - しろかい!」に書いてあるように,Tridentでは実行時に自動的にSpout/Boltに各Operationが変換されます.その際,.shuffle()groupBy()と言った,ネットワーク通信が必須となる(Tupleを別のノードに送る)操作で区切って,Boltを生成します.
前者のソースコードでは,Function1()Function2()は1つのBoltにまとめられてしまい,またSpoutと同一ノードで処理可能であるため,並列化されません(Spoutは1個なので,その数に合わせられてしまう).
一方,後者のソースコードでは.shuffle()はネットワーク通信が必須であるため,Function1()Function2()は別々のBoltとして生成されます.その際, parallelismHint(10)を指定しているので,10個に並列されます.

ちなみに,以下のように書くと,Function1()Function2()がまとめられたBoltが10個生成されます.

Stream inputStream = topology
    .newStream("sentence", spout)
    .shuffle()
    .each(new Fields("field"), new Function1())
    .each(new Fields("field"), new Function2(), new Fields())
    .parallelismHint(10);

さらにもう1つ.
以下のcodeを実行すると,Function1()のBoltは10個,Function2()のBoltは1個生成されます.

Stream inputStream = topology
    .newStream("sentence", spout)
    .shuffle()
    .each(new Fields("field"), new Function1())
    .parallelismHint(10);
    .shuffle()
    .each(new Fields("field"), new Function2(), new Fields())
    .parallelismHint(1);

以上,Tridentでの並列数の指定方法でした.