CONTACT
お問い合わせ

ApacheStorm環境構築とサンプルソースの実行

ApacheStormを社内に構築した際の手順を記述する。

目次

ApacheStorm環境構築とサンプルソースの実行

  1. ApacheStorm概要
  2. ApacheStormインストール手順
  3. サンプルソースの開発と実行
  4. サンプルソース
  5. ダウンロード
  6. 参考

1.ApacheStorm概要

オープンソースで耐障害性に優れた高速分散処理型ニアリアルタイムビッグデータ処理フレームワーク。

特徴

  ◎ ストリームデータを溜め込んでから処理を開始するのではなく、読み込みながらノンストップで処理を実行し続ける。
  ◎ ほぼリアルタイムで、連続的に発生する大規模データ分析の高速処理を実現するフレームワークである。
  ◎ オンメモリベースで1件ずつ処理するため、低レイテンシでのリアルタイム性が高い処理を実現できる。

適している使用面

ApacheStormでは、下記のようなリアルタイム性が要求されるシナリオに適している。

  ◎ 不正の検出
  ◎ クリックストリーム分析
  ◎ 金融取引関連の警告処理
  ◎ 大量なIoTデバイスの監視
  ◎ ソーシャル分析
  ◎ ネットワーク監視            など・・・

2.ApacheStormインストール手順

1. wgetのインストール

①JDK8のダウンロードにwgetを利用するため、インストールされていない場合はインストールする。

[root@AF-SV ~]# yum install wget 
Loaded plugins: fastestmirror
Loading mirror speeds from cached hostfile
* base: ftp.iij.ad.jp
* extras: ftp.iij.ad.jp
* updates: ftp.iij.ad.jp
base
    | 3.7 kB  00:00
extras
    | 3.4 kB  00:00
updates
    | 3.4 kB  00:00
Setting up Install Process
Resolving Dependencies
--> Running transaction check
---> Package wget.x86_64 0:1.12-8.el6 will be installed
--> Finished Dependency Resolution

Dependencies Resolved

======================================================
 Package  Arch  Version  Repository  Size
======================================================
Installing:
 wget   x86_64  1.12-8.el6  base  484 k

Transaction Summary
======================================================
Install 1 Package(s)

Total download size: 484 k
Installed size: 1.8 M

②インストール確認をされるため[y]を入力する

Is this ok [y/N]:y
Downloading Packages:
wget-1.12-8.el6.x86_64.rpm
        | 484 kB  00:00
Running rpm_check_debug
Running Transaction Test
Transaction Test Succeeded
Running Transaction
  Installing : wget-1.12-8.el6.x86_64
                   1/1
  Verifying : wget-1.12-8.el6.x86_64
                   1/1

Installed:
  wget.x86_64 0:1.12-8.el6

Complete!

2. JDKのインストール

①Linuxプラットフォームでの64ビットJDK 8をダウンロードする。
  OracleからLinuxプラットフォームでの64ビットJDK 8をダウンロードする。

[root@AF-SV ~]# wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm
--2016-12-20 00:25:28-- http://download.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm
Resolving download.oracle.com... 104.80.89.16, 104.80.89.58
Connecting to download.oracle.com|104.80.89.16|:80... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: https://edelivery.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm [following]
--2016-12-20 00:25:28-- https://edelivery.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm
Resolving edelivery.oracle.com... 23.47.129.179
Connecting to edelivery.oracle.com|23.47.129.179|:443... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: http://download.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm?AuthParam=1482161248_18c86e5cb00e8780bc3d79fe96b79290 [following]
--2016-12-20 00:25:28-- http://download.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm?AuthParam=1482161248_18c86e5cb00e8780bc3d79fe96b79290
Connecting to download.oracle.com|104.80.89.16|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 167741674 (160M) [application/x-redhat-package-manager]
Saving to: “jdk-8u112-linux-x64.rpm”
100%[========================>] 167,741,674 4.62M/s in 39s
2016-12-20 00:26:07 (4.13 MB/s) - “jdk-8u112-linux-x64.rpm” saved [167741674/167741674]

②ダウンロードしたrpmパッケージをインストール

[root@AF-SV ~]# rpm -ivh jdk-8u112-linux-x64.rpm
Preparing... ########################################### [100%]
   1:jdk1.8.0_112 ########################################### [100%]
Unpacking JAR files...
      tools.jar...
      plugin.jar...
      javaws.jar...
      deploy.jar...
      rt.jar...
      jsse.jar...
      charsets.jar...
      localedata.jar...

③Javaが正しくインストールされているか確認する。

[root@AF-SV ~]# java -version
java version "1.8.0_112"
Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)

④パスの設定

[root@AF-SV java]# export JAVA_HOME=/usr/java/default
[root@AF-SV java]# source ~/.bashrc

3. zookeeperのインストール

①/optに移動

[root@AF-SV java]# cd /opt

②zookeeper(ver.3.4.9)のダウンロード

[root@AF-SV opt]# wget http://www-eu.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
--2016-12-20 00:29:36-- http://www-eu.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
Resolving www-eu.apache.org... 88.198.26.2
Connecting to www-eu.apache.org|88.198.26.2|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 22724574 (22M) [application/x-gzip]
Saving to: “zookeeper-3.4.9.tar.gz”

100%[=====================>] 22,724,574 3.51M/s in 7.9s

2016-12-20 00:29:44 (2.74 MB/s) - “zookeeper-3.4.9.tar.gz” saved [22724574/22724574]

③展開

[root@AF-SV opt]# tar -zxvf zookeeper-3.4.9.tar.gz

④ディレクトリ名もわかりやすい名前にリネーム

[root@AF-SV opt]# mv zookeeper-3.4.9 zookeeper

⑤dataディレクトリを作成します。

[root@AF-SV conf]# mkdir /opt/zookeeper/data/

⑥設定ファイルzoo.cfgを作成

[root@AF-SV conf]# vi /opt/zookeeper/conf/zoo.cfg

⑦設定ファイルを編集

[root@AF-SV conf]# cat /opt/zookeeper/conf/zoo.cfg
tickTime=2000
dataDir=/opt/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

4. zookeeperの起動

[root@AF-SV opt]# /opt/zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg

5. apache-stormのインストール

①ApacheStorm(ver.1.0.2)をダウンロード

[root@AF-SV opt]# wget http://www-eu.apache.org/dist/storm/apache-storm-1.0.2/apache-storm-1.0.2.tar.gz
--2016-12-20 00:34:57-- http://www-eu.apache.org/dist/storm/apache-storm-1.0.2/apache-storm-1.0.2.tar.gz
Resolving www-eu.apache.org... 88.198.26.2
Connecting to www-eu.apache.org|88.198.26.2|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 179161400 (171M) [application/x-gzip]
Saving to: “apache-storm-1.0.2.tar.gz”

100%[====================>] 179,161,400 4.50M/s in 42s

2016-12-20 00:35:39 (4.11 MB/s) - “apache-storm-1.0.2.tar.gz” saved [179161400/179161400]

②ダウンロードファイルを展開

[root@AF-SV opt]# tar -zxvf apache-storm-1.0.2.tar.gz>

③ディレクトリ名もわかりやすい名前にリネーム

[root@AF-SV opt]# mv apache-storm-1.0.2 storm

④dataディレクトリを作成

[root@AF-SV storm]# mkdir /opt/storm/data/

⑤設定ファイルstorm.yamlを作成

[root@AF-SV conf]# vi storm.yaml

⑥設定ファイルを編集

[root@AF-SV opt]# cat /opt/storm/conf/storm.yaml
storm.zookeeper.servers:
- "localhost"
storm.local.dir: "/opt/storm/data"
nimbus.host: "localhost"
supervisor.slots.ports:
 - 6700
 - 6701
 - 6702
 - 6703

6. hostsへのHOST名追記

①設定ファイルhostsを作成

[root@AF-SV bin]# vi /etc/hosts

②設定ファイルを編集

[root@AF-SV bin]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 AF-SV.CO12
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

③ファイルの確認

[root@AF-SV opt]# cd /opt/storm/bin/
[root@AF-SV bin]# ll
total 60
-rwxr-xr-x. 1 502 games 4214 Jul 27 04:11 flight.bash
-rwxr-xr-x. 1 502 games 2179 Jul 27 04:11 storm
-rwxr-xr-x. 1 502 games 8925 Jul 27 04:11 storm.cmd
-rwxr-xr-x. 1 502 games 4147 Jul 27 04:11 storm-config.cmd
-rwxr-xr-x. 1 502 games 28323 Jul 27 04:11 storm.py

7. StormNimbusの起動

[root@AF-SV bin]# ./storm nimbus &
[1] 2636
# Running: /usr/java/default/bin/java -server -Ddaemon.name=nimbus -Dstorm.options= -Dstorm.home=/opt/storm -Dstorm.log.dir=/opt/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm/lib/storm-core-1.0.2.jar:/opt/storm/lib/storm-rename-hack-1.0.2.jar:/opt/storm/lib/reflectasm-1.10.1.jar:/opt/storm/lib/objenesis-2.1.jar:/opt/storm/lib/minlog-1.3.0.jar:
/opt/storm/lib/asm-5.0.3.jar:/opt/storm/lib/log4j-api-2.1.jar:/opt/storm/lib/slf4j-api-1.7.7.jar:/opt/storm/lib/kryo-3.0.3.jar:/opt/storm/lib/clojure-1.7.0.jar:/opt/storm/lib/log4j-core-2.1.jar:/opt/storm/lib/disruptor-3.3.2.jar:/opt/storm/lib/log4j-slf4j-impl-2.1.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/conf -Xmx1024m -Dlogfile.name=nimbus.log -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector -Dlog4j.configurationFile=/opt/storm/log4j2/cluster.xml org.apache.storm.daemon.nimbus

8. StormSupervisorの起動

[root@AF-SV bin]# ./storm supervisor &
[1] 2721
# Running: /usr/java/default/bin/java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/opt/storm -Dstorm.log.dir=/opt/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm/lib/storm-core-1.0.2.jar:/opt/storm/lib/storm-rename-hack-1.0.2.jar:/opt/storm/lib/reflectasm-1.10.1.jar:/opt/storm/lib/objenesis-2.1.jar:/opt/storm/lib/minlog-1.3.0.jar:
/opt/storm/lib/asm-5.0.3.jar:/opt/storm/lib/log4j-api-2.1.jar:/opt/storm/lib/slf4j-api-1.7.7.jar:/opt/storm/lib/kryo-3.0.3.jar:/opt/storm/lib/clojure-1.7.0.jar:/opt/storm/lib/log4j-core-2.1.jar:/opt/storm/lib/disruptor-3.3.2.jar:/opt/storm/lib/log4j-slf4j-impl-2.1.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/conf -Xmx256m -Dlogfile.name=supervisor.log -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector -Dlog4j.configurationFile=/opt/storm/log4j2/cluster.xml org.apache.storm.daemon.supervisor

9. StormUIの起動

[root@AF-SV bin]# ./storm ui &
[1] 2806

10. StormUIの確認(ブラウザアクセス)

11. gitコマンドのインストール

①gitコマンドのインストール

[root@AF-SV bin]# yum -y install git
Loaded plugins: fastestmirror
Determining fastest mirrors
 * base: ftp.tsukuba.wide.ad.jp
 * extras: ftp.tsukuba.wide.ad.jp
 * updates: ftp.tsukuba.wide.ad.jp
base                   | 3.7 kB  00:00
extras                  | 3.4 kB  00:00
updates                 | 3.4 kB  00:00
updates/primary_db          | 3.7 MB  00:06
Setting up Install Process
Resolving Dependencies
・
・
(省略)
・
・
Installed:
 git.x86_64 0:1.7.1-4.el6_7.1

Dependency Installed:
 libedit.x86_64 0:2.11-4.20080712cvs.1.el6 openssh-clients.x86_64 0:5.3p1-118.1.el6_8 perl.x86_64 4:5.10.1-141.el6_7.1
 perl-Error.noarch 1:0.17015-4.el6 perl-Git.noarch 0:1.7.1-4.el6_7.1 perl-Module-Pluggable.x86_64 1:3.90-141.el6_7.1
 perl-Pod-Escapes.x86_64 1:1.04-141.el6_7.1 perl-Pod-Simple.x86_64 1:3.13-141.el6_7.1 perl-libs.x86_64 4:5.10.1-141.el6_7.1
 perl-version.x86_64 3:0.77-141.el6_7.1 rsync.x86_64 0:3.0.6-12.el6

Dependency Updated:
 openssh.x86_64 0:5.3p1-118.1.el6_8 openssh-server.x86_64 0:5.3p1-118.1.el6_8

Complete!

②インストール完了の確認

[root@AF-SV bin]# git --version
it version 1.7.1

12. mvnのインストール

①用意したMavenファイルを、/opt配下にコピー

[root@AF-SV ~]# cp /tmp/apache-maven-3.3.9-bin.tar.gz /opt

②/optに移動

[root@AF-SV ~]# cd /opt

③圧縮ファイルを展開

[root@AF-SV ~]# tar zxvf apache-maven-3.3.9-bin.tar.gz

④.bash_profileを開く

[root@AF-SV ~]# vi .bash_profile

⑤ファイルを編集

[root@AF-SV ~]# cat .bash_profile
# .bash_profile

# Get the aliases and functions
if [ -f ~/.bashrc ]; then
   . ~/.bashrc
fi

# User specific environment and startup programs

export JAVA_HOME=/usr/java/default
export M3_HOME=/opt/apache-maven-3.3.9
M3=$M3_HOME/bin

PATH=$PATH:$M3:$JAVA_HOME/bin:$HOME/bin

export PATH

⑥設定したbash_profileを読み込む

[root@AF-SV ~]# source ~/.bash_profile

⑦インストール完了の確認

[root@AF-SV ~]# mvn --version
Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; 2015-11-11T01:41:47+09:00)
Maven home: /opt/apache-maven-3.3.9
Java version: 1.8.0_112, vendor: Oracle Corporation
Java home: /usr/java/jdk1.8.0_112/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "2.6.32-431.el6.x86_64", arch: "amd64", family: "unix"

13.storm-starterのリポジトリのクローン

[storm@AF-SV ~]$ git clone git://github.com/apache/storm.git
Initialized empty Git repository in /home/storm/storm/.git/
remote: Counting objects: 113645, done.
remote: Compressing objects: 100% (57/57), done.
remote: Total 113645 (delta 21), reused 0 (delta 0), pack-reused 113579
Receiving objects: 100% (113645/113645), 160.55 MiB | 1.90 MiB/s, done.
Resolving deltas: 100% (56991/56991), done.

14.[storm-project]のビルド

[storm@AF-SV ~]$ pwd
/home/storm/storm/
[storm@AF-SV ~]$ mvn clean install -DskipTests=true
・
・
 ※2~3時間程度ダウンロード+コンパイル
・
・

15.ExclamationTopologyのサンプル実行

[storm@AF-SV ~]$ /opt/storm/bin/storm jar ./storm/examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar org.apache.storm.starter.ExclamationTopology ExclamationTopology
Running: /usr/java/default/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/storm -Dstorm.log.dir=/opt/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm/lib/asm-5.0.3.jar:/opt/storm/lib/clojure-1.7.0.jar:/opt/storm/lib/storm-rename-hack-1.0.2.jar:/opt/storm/lib/disruptor-3.3.2.jar:/opt/storm/lib/minlog-1.3.0.jar:/opt/storm/lib/slf4j-api-1.7.7.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/objenesis-2.1.jar:/opt/storm/lib/log4j-core-2.1.jar:/opt/storm/lib/kryo-3.0.3.jar:/opt/storm/lib/log4j-api-2.1.jar:/opt/storm/lib/storm-core-1.0.2.jar:/opt/storm/lib/log4j-slf4j-impl-2.1.jar:/opt/storm/lib/reflectasm-1.10.1.jar:./storm/examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar:/home/storm/.storm:/opt/storm/bin -Dstorm.jar=./storm/examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar org.apache.storm.starter.ExclamationTopology ExclamationTopology
1129 [main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8990300227665366477:-6536939782144685199
1293 [main] INFO o.a.s.s.a.AuthUtils - Got AutoCreds []
1296 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
1363 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
1461 [main] INFO o.a.s.StormSubmitter - Uploading topology jar ./storm/examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar to assigned location: /opt/storm/data/nimbus/inbox/stormjar-294b21a9-ce69-4401-a2fd-2795fb3d2383.jar
Start uploading file './storm/examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar' to '/opt/storm/data/nimbus/inbox/stormjar-294b21a9-ce69-4401-a2fd-2795fb3d2383.jar' (63551525 bytes)
[==================================================] 63551525 / 63551525
File './storm/examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar' uploaded to '/opt/storm/data/nimbus/inbox/stormjar-294b21a9-ce69-4401-a2fd-2795fb3d2383.jar' (63551525 bytes)
16210 [main] INFO o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /opt/storm/data/nimbus/inbox/stormjar-294b21a9-ce69-4401-a2fd-2795fb3d2383.jar
16210 [main] INFO o.a.s.StormSubmitter - Submitting topology ExclamationTopology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-8990300227665366477:-6536939782144685199","topology.workers":3,"topology.debug":true}
16210 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
25996 [main] INFO o.a.s.StormSubmitter - Finished submitting topology: ExclamationTopology

16.ExclamationTopologyの動作確認

[root@AF-SV ~]# /opt/storm/bin/storm list
・
・
(省略)
・
・
Topology_name      Status  Num_tasks  Num_workers  Uptime_secs
-------------------------------------------------------------------
ExclamationTopology  ACTIVE  0        0          87081

17. ExclamationTopologyの停止

[root@AF-SV ~]# /opt/storm/bin/storm kill ExclamationTopology
Running: /usr/java/default/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/storm -Dstorm.log.dir=/opt/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm/lib/asm-5.0.3.jar:/opt/storm/lib/clojure-1.7.0.jar:/opt/storm/lib/storm-rename-hack-1.0.2.jar:/opt/storm/lib/disruptor-3.3.2.jar:/opt/storm/lib/minlog-1.3.0.jar:/opt/storm/lib/slf4j-api-1.7.7.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/objenesis-2.1.jar:/opt/storm/lib/log4j-core-2.1.jar:/opt/storm/lib/kryo-3.0.3.jar:/opt/storm/lib/log4j-api-2.1.jar:/opt/storm/lib/storm-core-1.0.2.jar:/opt/storm/lib/log4j-slf4j-impl-2.1.jar:/opt/storm/lib/reflectasm-1.10.1.jar:/opt/storm/conf:/opt/storm/bin org.apache.storm.command.kill_topology ExclamationTopology
3538 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
5216 [main] INFO o.a.s.c.kill-topology - Killed topology: ExclamationTopology

※StormUIの[Topology Summary]に『ExclamationTopology』が表示されなくなる。

3.サンプルソースの開発と実行

1.サンプルクラスを作成する

実行内容概要

  ◎ テキストファイル内の各行をTuple(行Tuple)として生成。
  ◎ 行Tupleをカンマ(,)でSplitした単語をTuple(単語Tuple)として生成。
  ◎ 行数、単語数、文字数を数え上げる。

構成図

ApacheStormの構成は下記のようになっている。

①テキストファイル
テストデータのテキストファイルを準備する。
カンマ(,) 区切りのCSVファイル。
②Spout
[1]テキストファイルを読み込ませる。
[2]1行毎に行Tupleを生成する。

●LineReaderSpout.java
③Tuple:行Tuple
生成された行TupleをBoltに渡す。

※この時のTuple内のデータは、
行分割された状態のため、
カンマ(,)が混ざった状態。
(例)”aaa,bbb”
④Bolt:単語分割
[1]Tupleを都度読み込む。
[2]「カンマ(,)」でsplitして、
単語毎に単語Tupleを生成する。

※2スレッド並列

●WordSpitterBolt.java
⑤Bolt:行カウント
[1]読み込んだ行Tupleの件数(=行数)をカウントする。
[2]カウント終了時に行数を出力する

※3スレッド並列
⑥Tuple:単語Tuple
生成された単語TupleをBoltに渡す。

(例)
Tuple1→”aaa”
Tuple2→”bbb”
⑦Bolt:単語カウント
[1]Tupleを都度読み込む。
[2]読み込んだTuple内の単語数を数える。
・キー:単語
・バリュー:個数
[3]カウント終了時に単語毎の個数を出力する。

※3スレッド並列(fieldsGrouping:キー=単語)

●WordCounterBolt.java
⑧Bolt:文字数カウント
[1]Tupleを都度読み込む。
[2]読み込んだTuple内の文字数を数える。
[3]カウント終了時に単語の文字数を出力する。

※1スレッド 

2.EclipseにてApacheStormを動作させるクラスを作成する

  ◎ HelloStorm : mainメソッドの存在するクラス
  ◎ LineReaderSpout : 引数に指定されたファイルを読み込み、1行毎にTupleを生成する。
  ◎ LineCounterBolt : 行数を数え上げ、cleanup()時に行数を標準出力する。
  ◎ WordCounterBolt : 単語毎に数え上げ、cleanup()時に単語毎の個数を標準出力する。
  ◎ CharacterCounterBolt : 文字数を数え上げ、cleanup()時に文字数を標準出力する。

3.Eclipseでサンプルクラスを作成する

JavaProjectで”Storm-Starter”を作成。
以下をEclipseのローカルにダウンロードし外部Libに含めBuildPathに含める。
/opt/storm/lib/*.jar

4.EclipseでのJarファイルの作成

JavaプロジェクトをJarファイルにエクスポートする
StormStarter.jar
 ※ApacheStorm実行サーバにアップロードしておく

①プロジェクト名上で右クリック


②「Export」を選択


③「JAR file」を選択し「NEXT」押下

④「Storm-Strarter」配下の「lib」を選択
 ※チェックボックスはそのまま


⑤右窓に表示されたjarをすべて選択し、「Finish」押下

5.Stormクラスタでの実行準備:storm.yamlの作成

①ディレクトリ作成

[storm@AF-SV .storm]$ mkdir ~/.storm

②storm.yamlファイルを作成

[storm@AF-SV .storm]$ vi ~/.storm/storm.yaml

③ファイルを編集

[storm@AF-SV .storm]$ cat ~/.storm/storm.yaml
nimbus.host: "XXX.XXX.XXX.XXX"

6.Stormクラスタでの実行

※Eclipseで作成したJarを指定する。

①出力用のテキストファイルを編集(内容確認)

[storm@AF-SV ~]$ cat test.txt
aaa bbb
bbbb cccc
aaa bbb
・
・
(省略)
・
・
aaa bbb
bbbb cccc
hack-1.0.2.jar:/opt/storm/lib/disruptor-3.3.2.jar:/opt/storm/lib/minlog-1.3.0.jar:/opt/storm

②Stormクラスタでの実行

[storm@AF-SV ~]$ /opt/storm/bin/storm jar ./StormStarter.jar storm.starter.HelloStorm ./test.txt 20000 0
Running: /usr/java/default/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/storm -Dstorm.log.dir=/opt/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm/lib/asm-5.0.3.jar:/opt/storm/lib/clojure-1.7.0.jar:/opt/storm/lib/storm-rename-hack-1.0.2.jar:/opt/storm/lib/disruptor-3.3.2.jar:/opt/storm/lib/minlog-1.3.0.jar:/opt/storm/lib/slf4j-api-1.7.7.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/objenesis-2.1.jar:/opt/storm/lib/log4j-core-2.1.jar:/opt/storm/lib/kryo-3.0.3.jar:/opt/storm/lib/log4j-api-2.1.jar:/opt/storm/lib/storm-core-1.0.2.jar:/opt/storm/lib/log4j-slf4j-impl-2.1.jar:/opt/storm/lib/reflectasm-1.10.1.jar:./StormStarter.jar:/home/storm/.storm:/opt/storm/bin -Dstorm.jar=./StormStarter.jar storm.starter.HelloStorm ./test.txt 20000 0
・
・
(省略)
・
・
★ -- Word Counter [word-counter-8]
★ -- Word Counter  aaa : 297
★ -- Word Counter  ccc : 179
★ -- Word Counter  123 : 5
★ -- Word Counter  bbb : 297
★ -- Word Counter  KKKK : 1
★ -- Word Counter  abc : 119
★ -- Word Counter  EEE : 1
★ -- Word Counter  ddd : 59
★ -- Word Counter  fff : 119
★ -- Word Counter  YYY : 1
★ -- Word Counter  XXX : 1
★ -- Word Counter  ZZZ : 1
26793 [main] INFO o.a.s.d.executor - Shut down executor word-counter:[8 8]
26793 [main] INFO o.a.s.d.executor - Shutting down executor character-counter:[2 2]
26794 [Thread-16-character-counter-executor[2 2]] INFO o.a.s.util - Async loop interrupted!
26794 [Thread-15-disruptor-executor[2 2]-send-queue] INFO o.a.s.util - Async loop interrupted!
★ -- Character Counter [3273]
26803 [main] INFO o.a.s.d.executor - Shut down executor character-counter:[2 2]
26811 [main] INFO o.a.s.d.executor - Shutting down executor word-counter:[7 7]
26811 [Thread-18-word-counter-executor[7 7]] INFO o.a.s.util - Async loop interrupted!
26812 [Thread-17-disruptor-executor[7 7]-send-queue] INFO o.a.s.util - Async loop interrupted!
★ -- Word Counter [word-counter-7]
★ -- Word Counter  Xssssa : 2
26815 [main] INFO o.a.s.d.executor - Shut down executor word-counter:[7 7]
26815 [main] INFO o.a.s.d.executor - Shutting down executor line-counter:[3 3]
26816 [Thread-19-disruptor-executor[3 3]-send-queue] INFO o.a.s.util - Async loop interrupted!
26818 [Thread-20-line-counter-executor[3 3]] INFO o.a.s.util - Async loop interrupted!
★ -- LineCounter [96]
26821 [main] INFO o.a.s.d.executor - Shut down executor line-counter:[3 3]
26821 [main] INFO o.a.s.d.executor - Shutting down executor __acker:[1 1]
26822 [Thread-22-__acker-executor[1 1]] INFO o.a.s.util - Async loop interrupted!
26822 [Thread-21-disruptor-executor[1 1]-send-queue] INFO o.a.s.util - Async loop interrupted!
26823 [main] INFO o.a.s.d.executor - Shut down executor __acker:[1 1]
26823 [main] INFO o.a.s.d.executor - Shutting down executor line-reader-spout:[6 6]
26823 [Thread-23-disruptor-executor[6 6]-send-queue] INFO o.a.s.util - Async loop interrupted!
26825 [Thread-24-line-reader-spout-executor[6 6]] INFO o.a.s.util - Async loop interrupted!
close()
26827 [main] INFO o.a.s.d.executor - Shut down executor line-reader-spout:[6 6]
26827 [main] INFO o.a.s.d.executor - Shutting down executor word-counter:[9 9]
26827 [Thread-26-word-counter-executor[9 9]] INFO o.a.s.util - Async loop interrupted!
26827 [Thread-25-disruptor-executor[9 9]-send-queue] INFO o.a.s.util - Async loop interrupted!
★ -- Word Counter [word-counter-9]
★ -- Word Counter  8890 : 5
26828 [main] INFO o.a.s.d.executor - Shut down executor word-counter:[9 9]
26828 [main] INFO o.a.s.d.executor - Shutting down executor word-splitter:[11 11]
26828 [Thread-28-word-splitter-executor[11 11]] INFO o.a.s.util - Async loop interrupted!
26828 [Thread-27-disruptor-executor[11 11]-send-queue] INFO o.a.s.util - Async loop interrupted!
26829 [main] INFO o.a.s.d.executor - Shut down executor word-splitter:[11 11]
26830 [main] INFO o.a.s.d.executor - Shutting down executor __system:[-1 -1]
26830 [Thread-30-__system-executor[-1 -1]] INFO o.a.s.util - Async loop interrupted!
26830 [Thread-29-disruptor-executor[-1 -1]-send-queue] INFO o.a.s.util - Async loop interrupted!
26837 [main] INFO o.a.s.d.executor - Shut down executor __system:[-1 -1]
26837 [main] INFO o.a.s.d.executor - Shutting down executor line-counter:[5 5]
26837 [Thread-32-line-counter-executor[5 5]] INFO o.a.s.util - Async loop interrupted!
26837 [Thread-31-disruptor-executor[5 5]-send-queue] INFO o.a.s.util - Async loop interrupted!
★ -- LineCounter [114]
26838 [main] INFO o.a.s.d.executor - Shut down executor line-counter:[5 5]
26838 [main] INFO o.a.s.d.executor - Shutting down executor word-splitter:[10 10]
26839 [Thread-34-word-splitter-executor[10 10]] INFO o.a.s.util - Async loop interrupted!
26839 [Thread-33-disruptor-executor[10 10]-send-queue] INFO o.a.s.util - Async loop interrupted!
26839 [main] INFO o.a.s.d.executor - Shut down executor word-splitter:[10 10]
26839 [main] INFO o.a.s.d.executor - Shutting down executor line-counter:[4 4]
26840 [Thread-36-line-counter-executor[4 4]] INFO o.a.s.util - Async loop interrupted!
26840 [Thread-35-disruptor-executor[4 4]-send-queue] INFO o.a.s.util - Async loop interrupted!
★ -- LineCounter [96]
・
・
(省略)
・
・
26942 [main] INFO o.a.s.testing - Done shutting down in process zookeeper
26942 [main] INFO o.a.s.testing - Deleting temporary path /tmp/7c5244b0-1895-4c0e-b256-0eec3399422f
26947 [main] INFO o.a.s.testing - Deleting temporary path /tmp/df56a758-f853-4bf8-8b85-05cc6a11a5e0
26948 [main] INFO o.a.s.testing - Deleting temporary path /tmp/1d84bb0a-32f0-409d-8ba2-f30b443d8dc5
26949 [main] INFO o.a.s.testing - Deleting temporary path /tmp/d5ee5bf2-bb0a-4d33-9c08-b6712d461fe4
28419 [SessionTracker] INFO o.a.s.s.o.a.z.s.SessionTrackerImpl - SessionTrackerImpl exited loop!

 ※行数、単語数、文字数の数え上げができている事が分かる。
 ※各スレッドごとの出力結果をサマリすると合計数が分かる。

4.サンプルソース

HelloStorm.java

package storm.starter;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class HelloStorm {
 public static String TOPOLOGY_NAME_HelloStorm="HelloStorm";
 public static void main(String[] args) throws Exception {

  if (args.length < 3) {
   System.out.println("args[0]=inputFile,args[1]=sleepTime(ms),args[2]=LocalCluster:StormSubmitter=0:1");
   System.exit(1);
  }

  Config config = new Config();
  config.put("inputFile", args[0]);
  config.setDebug(true);
  config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);

  System.out.println("[HelloStorm]TopologyBuilder[Start]");
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("line-reader-spout", new LineReaderSpout());
  builder.setBolt("word-splitter", new WordSplitterBolt(),2).shuffleGrouping("line-reader-spout");
  builder.setBolt("line-counter", new LineCounterBolt(),3).shuffleGrouping("line-reader-spout");
  builder.setBolt("word-counter", new WordCounterBolt(),3).fieldsGrouping("word-splitter",new Fields("word"));
  builder.setBolt("character-counter", new CharacterCounterBolt()).shuffleGrouping("word-splitter");

  System.out.println("[HelloStorm]TopologyBuilder[End]");

  boolean flgLocalCluster= true;

  try{
   if(args[2].equals("1"))
   flgLocalCluster= false;
  }catch(Exception e){
  }

  if(flgLocalCluster){
   System.out.println("[HelloStorm]LocalCluster[Start]");
   System.out.println("[HelloStorm]TOPOLOGY_NAME="+TOPOLOGY_NAME_HelloStorm);

   LocalCluster cluster = new LocalCluster();
   cluster.submitTopology(TOPOLOGY_NAME_HelloStorm, config, builder.createTopology());
   System.out.println("[HelloStorm]LocalCluster.submitTopology()[End]");
   long ms = 10000;

   try {
    ms = Long.parseLong(args[1]);
   } catch (Exception e) {
   }
   System.out.println("[HelloStorm]sleep(" + ms + ")[Start]");
   Thread.sleep(ms);
   cluster.killTopology(TOPOLOGY_NAME_HelloStorm);
   System.out.println("[HelloStorm]cluster.shutdown()");
   cluster.shutdown();
  }else{
   System.out.println("[HelloStorm]StormSubmitter[Start]");
   StormSubmitter.submitTopology(TOPOLOGY_NAME_HelloStorm, config, builder.createTopology());
   System.out.println("[HelloStorm]StormSubmitter[End]");
  }
 }
}

LineReaderSpout.java

package storm.starter;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class LineReaderSpout implements IRichSpout {
 private SpoutOutputCollector collector;
 private FileReader fileReader;
 private boolean completed = false;
 private TopologyContext context;

 @Override
 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  try {
   System.out.println("[LineReaderSpout]inputFile=" + conf.get("inputFile").toString());
   this.context = context;
   this.fileReader = new FileReader(conf.get("inputFile").toString());
  } catch (FileNotFoundException e) {
   throw new RuntimeException("Error reading file " + conf.get("inputFile"));
  }
   this.collector = collector;
  }

 @Override
 public void nextTuple() {
  System.out.println("nextTuple()");
  if (completed) {
   try {
    Thread.sleep(1000);
   } catch (InterruptedException e) {
   }
  }
  String str;
  BufferedReader reader = new BufferedReader(fileReader);
  try {
   while ((str = reader.readLine()) != null) {
    System.out.println("str=" + str);
    this.collector.emit(new Values(str), str);
   }
  } catch (Exception e) {
   throw new RuntimeException("Error reading typle", e);
  } finally {
   completed = true;
  }
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("line"));
 }

 @Override
 public void close() {
  System.out.println("close()");
  try {
   fileReader.close();
  } catch (IOException e) {
   e.printStackTrace();
  }
 }

 public boolean isDistributed() {
  return false;
 }

 @Override
 public void activate() {
  // TODO Auto-generated method stub
 }

 @Override
 public void deactivate() {
 // TODO Auto-generated method stub
 }

 @Override
 public void ack(Object msgId) {
  System.out.println("Got ACK for msgId : " + msgId);
 }

 @Override
 public void fail(Object msgId) {
  System.out.println("Got FAIL for msgId : " + msgId);
 }

 @Override
 public Map<String, Object> getComponentConfiguration() {
  return null;
 }
}

LineCounterBolt.java

package storm.starter;

import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

public class LineCounterBolt implements IRichBolt{
 Integer id;
 String name;
 long lineCount;
 private OutputCollector collector;

 @Override
 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  this.collector = collector;
  this.name = context.getThisComponentId();
  this.id = context.getThisTaskId();
 }

 @Override
 public void execute(Tuple input) {
  String str = input.getString(0);
  System.out.println("LineCounterBolt str = ["+ str + "]");
  lineCount++;
  collector.ack(input);
 }

 @Override
 public void cleanup() {
  System.out.println("★ -- LineCounter ["+ lineCount + "]");
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  // TODO Auto-generated method stub
 }

 @Override
 public Map<String, Object> getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }
}

WordSplitterBolt.java

package storm.starter;

import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordSplitterBolt implements IRichBolt{
 private OutputCollector collector;

 @Override
 public void prepare(Map stormConf, TopologyContext context,
 OutputCollector collector) {
  this.collector = collector;
 }

 @Override
 public void execute(Tuple input) {
  String sentence = input.getString(0);
  System.out.println("sentence=" + sentence);
  String[] words = sentence.split(",");

  for(String word: words){
   word = word.trim();
   System.out.println("word=" + word);
   if(!word.isEmpty()){
    System.out.println("■collector.emit■word=" + word);
    collector.emit(new Values(word));
   }
  }
  collector.ack(input);
 }

 @Override
 public void cleanup() {
  // TODO Auto-generated method stub
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("word"));
 }

 @Override
 public Map<String, Object> getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }
}

WordCounterBolt.java

package storm.starter;

import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

public class WordCounterBolt implements IRichBolt{
 Integer id;
 String name;
 Map<String, Integer> counters;
 private OutputCollector collector;

 @Override
 public void prepare(Map stormConf, TopologyContext context,
  OutputCollector collector) {
  this.counters = new HashMap<String, Integer>();
  this.collector = collector;
  this.name = context.getThisComponentId();
  this.id = context.getThisTaskId();
 }

 @Override
 public void execute(Tuple input) {
  String str = input.getString(0);
  if(!counters.containsKey(str)){
   counters.put(str, 1);
  }else{
   Integer c = counters.get(str) +1;
   counters.put(str, c);
  }
  collector.ack(input);
 }

 @Override
 public void cleanup() {
  System.out.println("★ -- Word Counter ["+ name + "-"+id +"]");
  for(Map.Entry<String, Integer> entry:counters.entrySet()){
   System.out.println("★ -- Word Counter \t"+entry.getKey()+" : " + entry.getValue());
  }
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  // TODO Auto-generated method stub
 }

 @Override
 public Map<String, Object> getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }
}

CharacterCounterBolt.java

package storm.starter;

import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

public class CharacterCounterBolt implements IRichBolt{
 Integer id;
 String name;
 long characters;
 private OutputCollector collector;

 @Override
 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  this.collector = collector;
  this.name = context.getThisComponentId();
  this.id = context.getThisTaskId();
 }

 @Override
 public void execute(Tuple input) {
  String str = input.getString(0);
  long size = str.length();
  System.out.println("str=["+ str +"] Size="+size);
  characters = characters + size;
  collector.ack(input);
 }

 @Override
 public void cleanup() {
  System.out.println("★ -- Character Counter ["+characters +"]");
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  // TODO Auto-generated method stub
 }

 @Override
 public Map<String, Object> getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }
}

5.ダウンロード

  ◎ HelloStorm.java >>Hellostorm.java
  ◎ LineReaderSpout.java >>LineReaderSpout.java
  ◎ LineCounterBolt.java >>wordsplitterbolt.java
  ◎ WordSplitterBolt.java >>WordSplitterBolt.java
  ◎ WordCounterBolt >>WordCounterBolt.java<
  ◎ CharacterCounterBolt.java >>CharacterCounterBolt.java
  ◎ test.txt >>test.txt
  ◎ 出力ログ >>8fo97cd838d83o.log

6.参考

  ◎ Apache Storm公式サイト