しろかい!

アプリ開発や機械学習などの開発Tips.

Apache Storm Trident Tutorial まとめ

改めてStorm Trident tutorialを読み直してみたのでまとめてみた.
誤訳あるかもしれないので信頼はしないでください^^;
(報告していただけると喜びます)

それと,上記ページの日本語訳というよりは,自分がメモしておきたかった部分をまとめたものになっています.
(上記ページの内容を網羅しているわけではありません.機会があればまとめます.)

Tridentとは

  • Storm上でリアルタイムコンピューティングを行うためのハイレベルの抽象化.
  • スループット(秒間100万メッセージ)と,状態を持つストリーム処理の結合.
  • TridentのコンセプトはPigやCascadingに似ている.
  • 次のような機能を持つ:joins, aggregations, grouping, functions, filters
  • 状態をインクリメンタルに変更するような処理を行うためのプリミティブ(基本的な命令)を提供(データベースとも連携).
  • 同じバッチを重複して処理しない(1回だけ処理する)ことを保証.

Components

Spout

  • 指定されたデータソースからデータを読み込み,ストリームに流し続す.
  • 通常は,KestrelやKafkaなどのqueue brokerからデータを取得し,ストリームに流す.
  • Tridentでは,ストリームを小さなバッチとして処理する.この例のspoutでは3文毎のバッチにしているが,通常はやってくるデータ量に応じて1000~1000000ぐらいのデータを1つのバッチにするらしい.

Operation

  • Tridentではspoutからやってきたバッチに対する処理を行えるAPIを提供.それらのAPIは,Hadoopを抽象化したものであるPigやCascadingに類似している.
  • バッチ毎に,joins, aggregations, run functions, run filtersなどが行える.
  • バッチを集めて処理することや,その結果を永続層に保存することも可能.

最適化

Tridentでは,以下の2つのことが自動的に行われる.

  • 状態の読み書きを行うOperation (persistentAggregateやstateQueryなど) は自動的にバッチで行われる.例えば20個のリクエストが合った場合に,1つずつ処理すると時間が掛かるが,まとめて1回のリクエストで済ませてしまうことで,時間を短縮できる.
  • ネットワーク上にtupleを流す前に,可能な限りそのノード上で処理を行ってしまう.(MapReduceのCombinerのような処理を自動的に行なってくれる→CombinerAggregatorインタフェース)

※以下2つの例のソースコードGitHubからDLできます.
storm/examples/storm-starter at master · apache/storm · GitHub

例1:WordCount

この例では2つのことを行う.

  1. ストリームから文を受け取り,単語を数え上げる(wordcount).
  2. 単語リストからカウント合計を得るためにクエリを実行する.

1の処理と2の処理は別々のストリームで実行される.

1の処理ストリームの定義

TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
       .parallelismHint(6);

TridentStateは,persistentAggregateした結果を状態(state)として保持できる.

2の処理ストリームの定義

topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
       .each(new Fields("count"), new FilterNull())
       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

外部からDRPCClientを起動することで,このストリームにクエリを投げられる.
この例では,例えば「cat dog the man」というクエリを投げると,TridentStateに保存された「cat」「dog」「the」「man」のカウントを合計した値を返してくれる.

DRPCClientを起動してクエリを投げるには,以下のjavaプログラムを実行すれば良い.

DRPCClientTest.java

import backtype.storm.utils.DRPCClient;

public class DRPCClientTest {
    // DRPCサーバーのホスト名 or IPアドレス
    static final String DRPC_SERVER_LOCATION = "hogehoge.com";
    // DRPCサーバーのポート番号
    static final int PORT = 3772;
    
    public static void main(String[] args) throws Exception {
        // DRPCClientの起動
        DRPCClient client = new DRPCClient(DRPC_SERVER_LOCATION, PORT);
        // wordsというストリームにクエリ「cat dog the man」を投げる.
        System.out.println(client.execute("words", "cat dog the man"));
        // [[500]]のような形式でカウントが標準出力に表示されるはず.
    }
}

例2:Reach

要求されたURLのReachを算出するTopology.
ここでいうReachとは,Twitter上でそのURLを見た(URLを含むTweetを見た)ユニークユーザーの数である.
つまり,Aさんがfoo.comというURL付きのTweetをしたとする.この時,Aさんのフォロワーは50人いるとすれば,Reachは50となる.
さらに,フォロワー数100人のBさんもfoo.comというURL付きのTweetをした.なお,AさんとBさんの共通のフォロワーが10人いる.この時,Reachは50+100-10=140となる.

Reachでは2つの状態ストリーム(TridentState)と1つのDRPCストリーム(DRPCStream)を使用する.

TridentState urlToTweeters

URL毎にそのURLを含むTweetを行ったユーザを保持.

TridentState urlToTweeters =
       topology.newStaticState(getUrlToTweetersState());

TridentState tweetersToFollowers

ユーザ毎にフォロワーの一覧を保持.

TridentState tweetersToFollowers =
       topology.newStaticState(getTweeterToFollowersState());

DRPCStream

ここにURLを投げると,そのURLのReachの値を返してくれる.

       topology.newDRPCStream("reach")
                // urlToTweetersを取得
                .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
                // Listが返ってくるので,要素を1つ1つTupleに分解
                .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
                // 各Tupleを均一に各Workerに分散
                .shuffle()
                // tweetersToFollowersを取得
                .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(),
                        new Fields("followers"))
                // 直前の処理(stateQuery)は200並列で行う
                .parallelismHint(200)
                // Listが返ってくるので,要素を1つ1つTupleに分解
                .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
                // フォロワー毎にグルーピング
                .groupBy(new Fields("follower"))
                // 各グループごとにTupleを1つだけ送る(共通のフォロワーの排除)
                .aggregate(new One(), new Fields("one"))
                // 直前の処理(stateQuery)は20並列で行う
                .parallelismHint(20)
                // 上記aggregateから送られてきたTupleの合計数をカウント.
                .aggregate(new Count(), new Fields("reach"));

ちなみに,OneというクラスはCombinerAggregatorインタフェースを実装しており,MapReduceのCombinerとほぼ同等の機能を持つ(自動的にCombine処理が行われる).

public class One implements CombinerAggregator<Integer> {
   public Integer init(TridentTuple tuple) {
       return 1;
   }

   public Integer combine(Integer val1, Integer val2) {
       return 1;
   }

   public Integer zero() {
       return 1;
   }        
}

FieldとTupleについて

Tridentでは基本的に各Operation毎に出力するTupleのFieldの変更はできない.
ただし,以下に限って(?)可能.

  • function:入力Tupleへのfieldの追加
  • aggregate:fieldの置換(入力Tupleは破棄される)

Stateについて

重複処理をしないための状態管理方法について書かれている.
各バッチにはtransaction idが割り振られており,これで重複の区別を行う.

TridentとSpout/Boltの関係

Tridentは,自動的に最も効率よく処理できるようなSpout/Boltに内部で変換しているらしい.
具体的にはネットワーク通信の必要な処理(groupBy, shuffle, partitionByなど)で区切って,1つのBoltに変換しているらしい.

Tutorialの画像を拝借して例を挙げると,上の画像のようなTridentの構成なら,下の画像のようなSpout/Boltに変換されるらしい. http://storm.apache.org/documentation/images/trident-to-storm1.pnghttp://storm.apache.org/documentation/images/trident-to-storm2.png

何のまとまりもないまとめになってしまいましたが,要望あれば改善しますm(__)m