ApacheStorm環境構築とサンプルソースの実行
ApacheStormを社内に構築した際の手順を記述する。
目次
ApacheStorm環境構築とサンプルソースの実行
1.ApacheStorm概要
オープンソースで耐障害性に優れた高速分散処理型ニアリアルタイムビッグデータ処理フレームワーク。
特徴
◎ ストリームデータを溜め込んでから処理を開始するのではなく、読み込みながらノンストップで処理を実行し続ける。
◎ ほぼリアルタイムで、連続的に発生する大規模データ分析の高速処理を実現するフレームワークである。
◎ オンメモリベースで1件ずつ処理するため、低レイテンシでのリアルタイム性が高い処理を実現できる。
適している使用面
ApacheStormでは、下記のようなリアルタイム性が要求されるシナリオに適している。
◎ 不正の検出
◎ クリックストリーム分析
◎ 金融取引関連の警告処理
◎ 大量なIoTデバイスの監視
◎ ソーシャル分析
◎ ネットワーク監視 など・・・
2.ApacheStormインストール手順
1. wgetのインストール
①JDK8のダウンロードにwgetを利用するため、インストールされていない場合はインストールする。
[root@AF-SV ~]# yum install wget
Loaded plugins: fastestmirror
Loading mirror speeds from cached hostfile
* base: ftp.iij.ad.jp
* extras: ftp.iij.ad.jp
* updates: ftp.iij.ad.jp
base
| 3.7 kB 00:00
extras
| 3.4 kB 00:00
updates
| 3.4 kB 00:00
Setting up Install Process
Resolving Dependencies
--> Running transaction check
---> Package wget.x86_64 0:1.12-8.el6 will be installed
--> Finished Dependency Resolution
Dependencies Resolved
======================================================
Package Arch Version Repository Size
======================================================
Installing:
wget x86_64 1.12-8.el6 base 484 k
Transaction Summary
======================================================
Install 1 Package(s)
Total download size: 484 k
Installed size: 1.8 M
②インストール確認をされるため[y]を入力する
Is this ok [y/N]:y
Downloading Packages:
wget-1.12-8.el6.x86_64.rpm
| 484 kB 00:00
Running rpm_check_debug
Running Transaction Test
Transaction Test Succeeded
Running Transaction
Installing : wget-1.12-8.el6.x86_64
1/1
Verifying : wget-1.12-8.el6.x86_64
1/1
Installed:
wget.x86_64 0:1.12-8.el6
Complete!
2. JDKのインストール
①Linuxプラットフォームでの64ビットJDK 8をダウンロードする。
OracleからLinuxプラットフォームでの64ビットJDK 8をダウンロードする。
[root@AF-SV ~]# wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm
--2016-12-20 00:25:28-- http://download.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm
Resolving download.oracle.com... 104.80.89.16, 104.80.89.58
Connecting to download.oracle.com|104.80.89.16|:80... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: https://edelivery.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm [following]
--2016-12-20 00:25:28-- https://edelivery.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm
Resolving edelivery.oracle.com... 23.47.129.179
Connecting to edelivery.oracle.com|23.47.129.179|:443... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: http://download.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm?AuthParam=1482161248_18c86e5cb00e8780bc3d79fe96b79290 [following]
--2016-12-20 00:25:28-- http://download.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm?AuthParam=1482161248_18c86e5cb00e8780bc3d79fe96b79290
Connecting to download.oracle.com|104.80.89.16|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 167741674 (160M) [application/x-redhat-package-manager]
Saving to: “jdk-8u112-linux-x64.rpm”
100%[========================>] 167,741,674 4.62M/s in 39s
2016-12-20 00:26:07 (4.13 MB/s) - “jdk-8u112-linux-x64.rpm” saved [167741674/167741674]
②ダウンロードしたrpmパッケージをインストール
[root@AF-SV ~]# rpm -ivh jdk-8u112-linux-x64.rpm
Preparing... ########################################### [100%]
1:jdk1.8.0_112 ########################################### [100%]
Unpacking JAR files...
tools.jar...
plugin.jar...
javaws.jar...
deploy.jar...
rt.jar...
jsse.jar...
charsets.jar...
localedata.jar...
③Javaが正しくインストールされているか確認する。
[root@AF-SV ~]# java -version
java version "1.8.0_112"
Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
④パスの設定
[root@AF-SV java]# export JAVA_HOME=/usr/java/default
[root@AF-SV java]# source ~/.bashrc
3. zookeeperのインストール
①/optに移動
[root@AF-SV java]# cd /opt
②zookeeper(ver.3.4.9)のダウンロード
[root@AF-SV opt]# wget http://www-eu.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
--2016-12-20 00:29:36-- http://www-eu.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
Resolving www-eu.apache.org... 88.198.26.2
Connecting to www-eu.apache.org|88.198.26.2|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 22724574 (22M) [application/x-gzip]
Saving to: “zookeeper-3.4.9.tar.gz”
100%[=====================>] 22,724,574 3.51M/s in 7.9s
2016-12-20 00:29:44 (2.74 MB/s) - “zookeeper-3.4.9.tar.gz” saved [22724574/22724574]
③展開
[root@AF-SV opt]# tar -zxvf zookeeper-3.4.9.tar.gz
④ディレクトリ名もわかりやすい名前にリネーム
[root@AF-SV opt]# mv zookeeper-3.4.9 zookeeper
⑤dataディレクトリを作成します。
[root@AF-SV conf]# mkdir /opt/zookeeper/data/
⑥設定ファイルzoo.cfgを作成
[root@AF-SV conf]# vi /opt/zookeeper/conf/zoo.cfg
⑦設定ファイルを編集
[root@AF-SV conf]# cat /opt/zookeeper/conf/zoo.cfg
tickTime=2000
dataDir=/opt/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
4. zookeeperの起動
[root@AF-SV opt]# /opt/zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
5. apache-stormのインストール
①ApacheStorm(ver.1.0.2)をダウンロード
[root@AF-SV opt]# wget http://www-eu.apache.org/dist/storm/apache-storm-1.0.2/apache-storm-1.0.2.tar.gz
--2016-12-20 00:34:57-- http://www-eu.apache.org/dist/storm/apache-storm-1.0.2/apache-storm-1.0.2.tar.gz
Resolving www-eu.apache.org... 88.198.26.2
Connecting to www-eu.apache.org|88.198.26.2|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 179161400 (171M) [application/x-gzip]
Saving to: “apache-storm-1.0.2.tar.gz”
100%[====================>] 179,161,400 4.50M/s in 42s
2016-12-20 00:35:39 (4.11 MB/s) - “apache-storm-1.0.2.tar.gz” saved [179161400/179161400]
②ダウンロードファイルを展開
[root@AF-SV opt]# tar -zxvf apache-storm-1.0.2.tar.gz>
③ディレクトリ名もわかりやすい名前にリネーム
[root@AF-SV opt]# mv apache-storm-1.0.2 storm
④dataディレクトリを作成
[root@AF-SV storm]# mkdir /opt/storm/data/
⑤設定ファイルstorm.yamlを作成
[root@AF-SV conf]# vi storm.yaml
⑥設定ファイルを編集
[root@AF-SV opt]# cat /opt/storm/conf/storm.yaml
storm.zookeeper.servers:
- "localhost"
storm.local.dir: "/opt/storm/data"
nimbus.host: "localhost"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
6. hostsへのHOST名追記
①設定ファイルhostsを作成
[root@AF-SV bin]# vi /etc/hosts
②設定ファイルを編集
[root@AF-SV bin]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 AF-SV.CO12
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
③ファイルの確認
[root@AF-SV opt]# cd /opt/storm/bin/
[root@AF-SV bin]# ll
total 60
-rwxr-xr-x. 1 502 games 4214 Jul 27 04:11 flight.bash
-rwxr-xr-x. 1 502 games 2179 Jul 27 04:11 storm
-rwxr-xr-x. 1 502 games 8925 Jul 27 04:11 storm.cmd
-rwxr-xr-x. 1 502 games 4147 Jul 27 04:11 storm-config.cmd
-rwxr-xr-x. 1 502 games 28323 Jul 27 04:11 storm.py
7. StormNimbusの起動
[root@AF-SV bin]# ./storm nimbus &
[1] 2636
# Running: /usr/java/default/bin/java -server -Ddaemon.name=nimbus -Dstorm.options= -Dstorm.home=/opt/storm -Dstorm.log.dir=/opt/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm/lib/storm-core-1.0.2.jar:/opt/storm/lib/storm-rename-hack-1.0.2.jar:/opt/storm/lib/reflectasm-1.10.1.jar:/opt/storm/lib/objenesis-2.1.jar:/opt/storm/lib/minlog-1.3.0.jar:
/opt/storm/lib/asm-5.0.3.jar:/opt/storm/lib/log4j-api-2.1.jar:/opt/storm/lib/slf4j-api-1.7.7.jar:/opt/storm/lib/kryo-3.0.3.jar:/opt/storm/lib/clojure-1.7.0.jar:/opt/storm/lib/log4j-core-2.1.jar:/opt/storm/lib/disruptor-3.3.2.jar:/opt/storm/lib/log4j-slf4j-impl-2.1.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/conf -Xmx1024m -Dlogfile.name=nimbus.log -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector -Dlog4j.configurationFile=/opt/storm/log4j2/cluster.xml org.apache.storm.daemon.nimbus
8. StormSupervisorの起動
[root@AF-SV bin]# ./storm supervisor &
[1] 2721
# Running: /usr/java/default/bin/java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/opt/storm -Dstorm.log.dir=/opt/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm/lib/storm-core-1.0.2.jar:/opt/storm/lib/storm-rename-hack-1.0.2.jar:/opt/storm/lib/reflectasm-1.10.1.jar:/opt/storm/lib/objenesis-2.1.jar:/opt/storm/lib/minlog-1.3.0.jar:
/opt/storm/lib/asm-5.0.3.jar:/opt/storm/lib/log4j-api-2.1.jar:/opt/storm/lib/slf4j-api-1.7.7.jar:/opt/storm/lib/kryo-3.0.3.jar:/opt/storm/lib/clojure-1.7.0.jar:/opt/storm/lib/log4j-core-2.1.jar:/opt/storm/lib/disruptor-3.3.2.jar:/opt/storm/lib/log4j-slf4j-impl-2.1.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/conf -Xmx256m -Dlogfile.name=supervisor.log -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector -Dlog4j.configurationFile=/opt/storm/log4j2/cluster.xml org.apache.storm.daemon.supervisor
9. StormUIの起動
[root@AF-SV bin]# ./storm ui &
[1] 2806
10. StormUIの確認(ブラウザアクセス)
11. gitコマンドのインストール
①gitコマンドのインストール
[root@AF-SV bin]# yum -y install git
Loaded plugins: fastestmirror
Determining fastest mirrors
* base: ftp.tsukuba.wide.ad.jp
* extras: ftp.tsukuba.wide.ad.jp
* updates: ftp.tsukuba.wide.ad.jp
base | 3.7 kB 00:00
extras | 3.4 kB 00:00
updates | 3.4 kB 00:00
updates/primary_db | 3.7 MB 00:06
Setting up Install Process
Resolving Dependencies
・
・
(省略)
・
・
Installed:
git.x86_64 0:1.7.1-4.el6_7.1
Dependency Installed:
libedit.x86_64 0:2.11-4.20080712cvs.1.el6 openssh-clients.x86_64 0:5.3p1-118.1.el6_8 perl.x86_64 4:5.10.1-141.el6_7.1
perl-Error.noarch 1:0.17015-4.el6 perl-Git.noarch 0:1.7.1-4.el6_7.1 perl-Module-Pluggable.x86_64 1:3.90-141.el6_7.1
perl-Pod-Escapes.x86_64 1:1.04-141.el6_7.1 perl-Pod-Simple.x86_64 1:3.13-141.el6_7.1 perl-libs.x86_64 4:5.10.1-141.el6_7.1
perl-version.x86_64 3:0.77-141.el6_7.1 rsync.x86_64 0:3.0.6-12.el6
Dependency Updated:
openssh.x86_64 0:5.3p1-118.1.el6_8 openssh-server.x86_64 0:5.3p1-118.1.el6_8
Complete!
②インストール完了の確認
[root@AF-SV bin]# git --version
it version 1.7.1
12. mvnのインストール
①用意したMavenファイルを、/opt配下にコピー
[root@AF-SV ~]# cp /tmp/apache-maven-3.3.9-bin.tar.gz /opt
②/optに移動
[root@AF-SV ~]# cd /opt
③圧縮ファイルを展開
[root@AF-SV ~]# tar zxvf apache-maven-3.3.9-bin.tar.gz
④.bash_profileを開く
[root@AF-SV ~]# vi .bash_profile
⑤ファイルを編集
[root@AF-SV ~]# cat .bash_profile
# .bash_profile
# Get the aliases and functions
if [ -f ~/.bashrc ]; then
. ~/.bashrc
fi
# User specific environment and startup programs
export JAVA_HOME=/usr/java/default
export M3_HOME=/opt/apache-maven-3.3.9
M3=$M3_HOME/bin
PATH=$PATH:$M3:$JAVA_HOME/bin:$HOME/bin
export PATH
⑥設定したbash_profileを読み込む
[root@AF-SV ~]# source ~/.bash_profile
⑦インストール完了の確認
[root@AF-SV ~]# mvn --version
Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; 2015-11-11T01:41:47+09:00)
Maven home: /opt/apache-maven-3.3.9
Java version: 1.8.0_112, vendor: Oracle Corporation
Java home: /usr/java/jdk1.8.0_112/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "2.6.32-431.el6.x86_64", arch: "amd64", family: "unix"
13.storm-starterのリポジトリのクローン
[storm@AF-SV ~]$ git clone git://github.com/apache/storm.git
Initialized empty Git repository in /home/storm/storm/.git/
remote: Counting objects: 113645, done.
remote: Compressing objects: 100% (57/57), done.
remote: Total 113645 (delta 21), reused 0 (delta 0), pack-reused 113579
Receiving objects: 100% (113645/113645), 160.55 MiB | 1.90 MiB/s, done.
Resolving deltas: 100% (56991/56991), done.
14.[storm-project]のビルド
[storm@AF-SV ~]$ pwd
/home/storm/storm/
[storm@AF-SV ~]$ mvn clean install -DskipTests=true
・
・
※2~3時間程度ダウンロード+コンパイル
・
・
15.ExclamationTopologyのサンプル実行
[storm@AF-SV ~]$ /opt/storm/bin/storm jar ./storm/examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar org.apache.storm.starter.ExclamationTopology ExclamationTopology
Running: /usr/java/default/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/storm -Dstorm.log.dir=/opt/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm/lib/asm-5.0.3.jar:/opt/storm/lib/clojure-1.7.0.jar:/opt/storm/lib/storm-rename-hack-1.0.2.jar:/opt/storm/lib/disruptor-3.3.2.jar:/opt/storm/lib/minlog-1.3.0.jar:/opt/storm/lib/slf4j-api-1.7.7.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/objenesis-2.1.jar:/opt/storm/lib/log4j-core-2.1.jar:/opt/storm/lib/kryo-3.0.3.jar:/opt/storm/lib/log4j-api-2.1.jar:/opt/storm/lib/storm-core-1.0.2.jar:/opt/storm/lib/log4j-slf4j-impl-2.1.jar:/opt/storm/lib/reflectasm-1.10.1.jar:./storm/examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar:/home/storm/.storm:/opt/storm/bin -Dstorm.jar=./storm/examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar org.apache.storm.starter.ExclamationTopology ExclamationTopology
1129 [main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8990300227665366477:-6536939782144685199
1293 [main] INFO o.a.s.s.a.AuthUtils - Got AutoCreds []
1296 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
1363 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
1461 [main] INFO o.a.s.StormSubmitter - Uploading topology jar ./storm/examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar to assigned location: /opt/storm/data/nimbus/inbox/stormjar-294b21a9-ce69-4401-a2fd-2795fb3d2383.jar
Start uploading file './storm/examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar' to '/opt/storm/data/nimbus/inbox/stormjar-294b21a9-ce69-4401-a2fd-2795fb3d2383.jar' (63551525 bytes)
[==================================================] 63551525 / 63551525
File './storm/examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar' uploaded to '/opt/storm/data/nimbus/inbox/stormjar-294b21a9-ce69-4401-a2fd-2795fb3d2383.jar' (63551525 bytes)
16210 [main] INFO o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /opt/storm/data/nimbus/inbox/stormjar-294b21a9-ce69-4401-a2fd-2795fb3d2383.jar
16210 [main] INFO o.a.s.StormSubmitter - Submitting topology ExclamationTopology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-8990300227665366477:-6536939782144685199","topology.workers":3,"topology.debug":true}
16210 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
25996 [main] INFO o.a.s.StormSubmitter - Finished submitting topology: ExclamationTopology
16.ExclamationTopologyの動作確認
[root@AF-SV ~]# /opt/storm/bin/storm list
・
・
(省略)
・
・
Topology_name Status Num_tasks Num_workers Uptime_secs
-------------------------------------------------------------------
ExclamationTopology ACTIVE 0 0 87081
17. ExclamationTopologyの停止
[root@AF-SV ~]# /opt/storm/bin/storm kill ExclamationTopology
Running: /usr/java/default/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/storm -Dstorm.log.dir=/opt/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm/lib/asm-5.0.3.jar:/opt/storm/lib/clojure-1.7.0.jar:/opt/storm/lib/storm-rename-hack-1.0.2.jar:/opt/storm/lib/disruptor-3.3.2.jar:/opt/storm/lib/minlog-1.3.0.jar:/opt/storm/lib/slf4j-api-1.7.7.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/objenesis-2.1.jar:/opt/storm/lib/log4j-core-2.1.jar:/opt/storm/lib/kryo-3.0.3.jar:/opt/storm/lib/log4j-api-2.1.jar:/opt/storm/lib/storm-core-1.0.2.jar:/opt/storm/lib/log4j-slf4j-impl-2.1.jar:/opt/storm/lib/reflectasm-1.10.1.jar:/opt/storm/conf:/opt/storm/bin org.apache.storm.command.kill_topology ExclamationTopology
3538 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
5216 [main] INFO o.a.s.c.kill-topology - Killed topology: ExclamationTopology
※StormUIの[Topology Summary]に『ExclamationTopology』が表示されなくなる。
3.サンプルソースの開発と実行
1.サンプルクラスを作成する
実行内容概要
◎ テキストファイル内の各行をTuple(行Tuple)として生成。
◎ 行Tupleをカンマ(,)でSplitした単語をTuple(単語Tuple)として生成。
◎ 行数、単語数、文字数を数え上げる。
構成図
ApacheStormの構成は下記のようになっている。
①テキストファイル テストデータのテキストファイルを準備する。 カンマ(,) 区切りのCSVファイル。 |
②Spout [1]テキストファイルを読み込ませる。 [2]1行毎に行Tupleを生成する。 ●LineReaderSpout.java |
③Tuple:行Tuple 生成された行TupleをBoltに渡す。 ※この時のTuple内のデータは、 行分割された状態のため、 カンマ(,)が混ざった状態。 (例)”aaa,bbb” |
④Bolt:単語分割 [1]Tupleを都度読み込む。 [2]「カンマ(,)」でsplitして、 単語毎に単語Tupleを生成する。 ※2スレッド並列 ●WordSpitterBolt.java |
⑤Bolt:行カウント [1]読み込んだ行Tupleの件数(=行数)をカウントする。 [2]カウント終了時に行数を出力する ※3スレッド並列 |
⑥Tuple:単語Tuple 生成された単語TupleをBoltに渡す。 (例) Tuple1→”aaa” Tuple2→”bbb” |
⑦Bolt:単語カウント [1]Tupleを都度読み込む。 [2]読み込んだTuple内の単語数を数える。 ・キー:単語 ・バリュー:個数 [3]カウント終了時に単語毎の個数を出力する。 ※3スレッド並列(fieldsGrouping:キー=単語) ●WordCounterBolt.java |
⑧Bolt:文字数カウント [1]Tupleを都度読み込む。 [2]読み込んだTuple内の文字数を数える。 [3]カウント終了時に単語の文字数を出力する。 ※1スレッド |
2.EclipseにてApacheStormを動作させるクラスを作成する
◎ HelloStorm : mainメソッドの存在するクラス
◎ LineReaderSpout : 引数に指定されたファイルを読み込み、1行毎にTupleを生成する。
◎ LineCounterBolt : 行数を数え上げ、cleanup()時に行数を標準出力する。
◎ WordCounterBolt : 単語毎に数え上げ、cleanup()時に単語毎の個数を標準出力する。
◎ CharacterCounterBolt : 文字数を数え上げ、cleanup()時に文字数を標準出力する。
3.Eclipseでサンプルクラスを作成する
JavaProjectで”Storm-Starter”を作成。
以下をEclipseのローカルにダウンロードし外部Libに含めBuildPathに含める。
/opt/storm/lib/*.jar
4.EclipseでのJarファイルの作成
JavaプロジェクトをJarファイルにエクスポートする
StormStarter.jar
※ApacheStorm実行サーバにアップロードしておく
①プロジェクト名上で右クリック
②「Export」を選択
③「JAR file」を選択し「NEXT」押下
④「Storm-Strarter」配下の「lib」を選択
※チェックボックスはそのまま
⑤右窓に表示されたjarをすべて選択し、「Finish」押下
5.Stormクラスタでの実行準備:storm.yamlの作成
①ディレクトリ作成
[storm@AF-SV .storm]$ mkdir ~/.storm
②storm.yamlファイルを作成
[storm@AF-SV .storm]$ vi ~/.storm/storm.yaml
③ファイルを編集
[storm@AF-SV .storm]$ cat ~/.storm/storm.yaml
nimbus.host: "XXX.XXX.XXX.XXX"
6.Stormクラスタでの実行
※Eclipseで作成したJarを指定する。
①出力用のテキストファイルを編集(内容確認)
[storm@AF-SV ~]$ cat test.txt
aaa bbb
bbbb cccc
aaa bbb
・
・
(省略)
・
・
aaa bbb
bbbb cccc
hack-1.0.2.jar:/opt/storm/lib/disruptor-3.3.2.jar:/opt/storm/lib/minlog-1.3.0.jar:/opt/storm
②Stormクラスタでの実行
[storm@AF-SV ~]$ /opt/storm/bin/storm jar ./StormStarter.jar storm.starter.HelloStorm ./test.txt 20000 0
Running: /usr/java/default/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/storm -Dstorm.log.dir=/opt/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm/lib/asm-5.0.3.jar:/opt/storm/lib/clojure-1.7.0.jar:/opt/storm/lib/storm-rename-hack-1.0.2.jar:/opt/storm/lib/disruptor-3.3.2.jar:/opt/storm/lib/minlog-1.3.0.jar:/opt/storm/lib/slf4j-api-1.7.7.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/objenesis-2.1.jar:/opt/storm/lib/log4j-core-2.1.jar:/opt/storm/lib/kryo-3.0.3.jar:/opt/storm/lib/log4j-api-2.1.jar:/opt/storm/lib/storm-core-1.0.2.jar:/opt/storm/lib/log4j-slf4j-impl-2.1.jar:/opt/storm/lib/reflectasm-1.10.1.jar:./StormStarter.jar:/home/storm/.storm:/opt/storm/bin -Dstorm.jar=./StormStarter.jar storm.starter.HelloStorm ./test.txt 20000 0
・
・
(省略)
・
・
★ -- Word Counter [word-counter-8]
★ -- Word Counter aaa : 297
★ -- Word Counter ccc : 179
★ -- Word Counter 123 : 5
★ -- Word Counter bbb : 297
★ -- Word Counter KKKK : 1
★ -- Word Counter abc : 119
★ -- Word Counter EEE : 1
★ -- Word Counter ddd : 59
★ -- Word Counter fff : 119
★ -- Word Counter YYY : 1
★ -- Word Counter XXX : 1
★ -- Word Counter ZZZ : 1
26793 [main] INFO o.a.s.d.executor - Shut down executor word-counter:[8 8]
26793 [main] INFO o.a.s.d.executor - Shutting down executor character-counter:[2 2]
26794 [Thread-16-character-counter-executor[2 2]] INFO o.a.s.util - Async loop interrupted!
26794 [Thread-15-disruptor-executor[2 2]-send-queue] INFO o.a.s.util - Async loop interrupted!
★ -- Character Counter [3273]
26803 [main] INFO o.a.s.d.executor - Shut down executor character-counter:[2 2]
26811 [main] INFO o.a.s.d.executor - Shutting down executor word-counter:[7 7]
26811 [Thread-18-word-counter-executor[7 7]] INFO o.a.s.util - Async loop interrupted!
26812 [Thread-17-disruptor-executor[7 7]-send-queue] INFO o.a.s.util - Async loop interrupted!
★ -- Word Counter [word-counter-7]
★ -- Word Counter Xssssa : 2
26815 [main] INFO o.a.s.d.executor - Shut down executor word-counter:[7 7]
26815 [main] INFO o.a.s.d.executor - Shutting down executor line-counter:[3 3]
26816 [Thread-19-disruptor-executor[3 3]-send-queue] INFO o.a.s.util - Async loop interrupted!
26818 [Thread-20-line-counter-executor[3 3]] INFO o.a.s.util - Async loop interrupted!
★ -- LineCounter [96]
26821 [main] INFO o.a.s.d.executor - Shut down executor line-counter:[3 3]
26821 [main] INFO o.a.s.d.executor - Shutting down executor __acker:[1 1]
26822 [Thread-22-__acker-executor[1 1]] INFO o.a.s.util - Async loop interrupted!
26822 [Thread-21-disruptor-executor[1 1]-send-queue] INFO o.a.s.util - Async loop interrupted!
26823 [main] INFO o.a.s.d.executor - Shut down executor __acker:[1 1]
26823 [main] INFO o.a.s.d.executor - Shutting down executor line-reader-spout:[6 6]
26823 [Thread-23-disruptor-executor[6 6]-send-queue] INFO o.a.s.util - Async loop interrupted!
26825 [Thread-24-line-reader-spout-executor[6 6]] INFO o.a.s.util - Async loop interrupted!
close()
26827 [main] INFO o.a.s.d.executor - Shut down executor line-reader-spout:[6 6]
26827 [main] INFO o.a.s.d.executor - Shutting down executor word-counter:[9 9]
26827 [Thread-26-word-counter-executor[9 9]] INFO o.a.s.util - Async loop interrupted!
26827 [Thread-25-disruptor-executor[9 9]-send-queue] INFO o.a.s.util - Async loop interrupted!
★ -- Word Counter [word-counter-9]
★ -- Word Counter 8890 : 5
26828 [main] INFO o.a.s.d.executor - Shut down executor word-counter:[9 9]
26828 [main] INFO o.a.s.d.executor - Shutting down executor word-splitter:[11 11]
26828 [Thread-28-word-splitter-executor[11 11]] INFO o.a.s.util - Async loop interrupted!
26828 [Thread-27-disruptor-executor[11 11]-send-queue] INFO o.a.s.util - Async loop interrupted!
26829 [main] INFO o.a.s.d.executor - Shut down executor word-splitter:[11 11]
26830 [main] INFO o.a.s.d.executor - Shutting down executor __system:[-1 -1]
26830 [Thread-30-__system-executor[-1 -1]] INFO o.a.s.util - Async loop interrupted!
26830 [Thread-29-disruptor-executor[-1 -1]-send-queue] INFO o.a.s.util - Async loop interrupted!
26837 [main] INFO o.a.s.d.executor - Shut down executor __system:[-1 -1]
26837 [main] INFO o.a.s.d.executor - Shutting down executor line-counter:[5 5]
26837 [Thread-32-line-counter-executor[5 5]] INFO o.a.s.util - Async loop interrupted!
26837 [Thread-31-disruptor-executor[5 5]-send-queue] INFO o.a.s.util - Async loop interrupted!
★ -- LineCounter [114]
26838 [main] INFO o.a.s.d.executor - Shut down executor line-counter:[5 5]
26838 [main] INFO o.a.s.d.executor - Shutting down executor word-splitter:[10 10]
26839 [Thread-34-word-splitter-executor[10 10]] INFO o.a.s.util - Async loop interrupted!
26839 [Thread-33-disruptor-executor[10 10]-send-queue] INFO o.a.s.util - Async loop interrupted!
26839 [main] INFO o.a.s.d.executor - Shut down executor word-splitter:[10 10]
26839 [main] INFO o.a.s.d.executor - Shutting down executor line-counter:[4 4]
26840 [Thread-36-line-counter-executor[4 4]] INFO o.a.s.util - Async loop interrupted!
26840 [Thread-35-disruptor-executor[4 4]-send-queue] INFO o.a.s.util - Async loop interrupted!
★ -- LineCounter [96]
・
・
(省略)
・
・
26942 [main] INFO o.a.s.testing - Done shutting down in process zookeeper
26942 [main] INFO o.a.s.testing - Deleting temporary path /tmp/7c5244b0-1895-4c0e-b256-0eec3399422f
26947 [main] INFO o.a.s.testing - Deleting temporary path /tmp/df56a758-f853-4bf8-8b85-05cc6a11a5e0
26948 [main] INFO o.a.s.testing - Deleting temporary path /tmp/1d84bb0a-32f0-409d-8ba2-f30b443d8dc5
26949 [main] INFO o.a.s.testing - Deleting temporary path /tmp/d5ee5bf2-bb0a-4d33-9c08-b6712d461fe4
28419 [SessionTracker] INFO o.a.s.s.o.a.z.s.SessionTrackerImpl - SessionTrackerImpl exited loop!
※行数、単語数、文字数の数え上げができている事が分かる。
※各スレッドごとの出力結果をサマリすると合計数が分かる。
4.サンプルソース
HelloStorm.java
package storm.starter;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class HelloStorm {
public static String TOPOLOGY_NAME_HelloStorm="HelloStorm";
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.out.println("args[0]=inputFile,args[1]=sleepTime(ms),args[2]=LocalCluster:StormSubmitter=0:1");
System.exit(1);
}
Config config = new Config();
config.put("inputFile", args[0]);
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
System.out.println("[HelloStorm]TopologyBuilder[Start]");
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("line-reader-spout", new LineReaderSpout());
builder.setBolt("word-splitter", new WordSplitterBolt(),2).shuffleGrouping("line-reader-spout");
builder.setBolt("line-counter", new LineCounterBolt(),3).shuffleGrouping("line-reader-spout");
builder.setBolt("word-counter", new WordCounterBolt(),3).fieldsGrouping("word-splitter",new Fields("word"));
builder.setBolt("character-counter", new CharacterCounterBolt()).shuffleGrouping("word-splitter");
System.out.println("[HelloStorm]TopologyBuilder[End]");
boolean flgLocalCluster= true;
try{
if(args[2].equals("1"))
flgLocalCluster= false;
}catch(Exception e){
}
if(flgLocalCluster){
System.out.println("[HelloStorm]LocalCluster[Start]");
System.out.println("[HelloStorm]TOPOLOGY_NAME="+TOPOLOGY_NAME_HelloStorm);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME_HelloStorm, config, builder.createTopology());
System.out.println("[HelloStorm]LocalCluster.submitTopology()[End]");
long ms = 10000;
try {
ms = Long.parseLong(args[1]);
} catch (Exception e) {
}
System.out.println("[HelloStorm]sleep(" + ms + ")[Start]");
Thread.sleep(ms);
cluster.killTopology(TOPOLOGY_NAME_HelloStorm);
System.out.println("[HelloStorm]cluster.shutdown()");
cluster.shutdown();
}else{
System.out.println("[HelloStorm]StormSubmitter[Start]");
StormSubmitter.submitTopology(TOPOLOGY_NAME_HelloStorm, config, builder.createTopology());
System.out.println("[HelloStorm]StormSubmitter[End]");
}
}
}
LineReaderSpout.java
package storm.starter;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class LineReaderSpout implements IRichSpout {
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false;
private TopologyContext context;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
System.out.println("[LineReaderSpout]inputFile=" + conf.get("inputFile").toString());
this.context = context;
this.fileReader = new FileReader(conf.get("inputFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file " + conf.get("inputFile"));
}
this.collector = collector;
}
@Override
public void nextTuple() {
System.out.println("nextTuple()");
if (completed) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
String str;
BufferedReader reader = new BufferedReader(fileReader);
try {
while ((str = reader.readLine()) != null) {
System.out.println("str=" + str);
this.collector.emit(new Values(str), str);
}
} catch (Exception e) {
throw new RuntimeException("Error reading typle", e);
} finally {
completed = true;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
@Override
public void close() {
System.out.println("close()");
try {
fileReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public boolean isDistributed() {
return false;
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
@Override
public void ack(Object msgId) {
System.out.println("Got ACK for msgId : " + msgId);
}
@Override
public void fail(Object msgId) {
System.out.println("Got FAIL for msgId : " + msgId);
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
LineCounterBolt.java
package storm.starter;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
public class LineCounterBolt implements IRichBolt{
Integer id;
String name;
long lineCount;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
System.out.println("LineCounterBolt str = ["+ str + "]");
lineCount++;
collector.ack(input);
}
@Override
public void cleanup() {
System.out.println("★ -- LineCounter ["+ lineCount + "]");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
WordSplitterBolt.java
package storm.starter;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class WordSplitterBolt implements IRichBolt{
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String sentence = input.getString(0);
System.out.println("sentence=" + sentence);
String[] words = sentence.split(",");
for(String word: words){
word = word.trim();
System.out.println("word=" + word);
if(!word.isEmpty()){
System.out.println("■collector.emit■word=" + word);
collector.emit(new Values(word));
}
}
collector.ack(input);
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
WordCounterBolt.java
package storm.starter;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
public class WordCounterBolt implements IRichBolt{
Integer id;
String name;
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else{
Integer c = counters.get(str) +1;
counters.put(str, c);
}
collector.ack(input);
}
@Override
public void cleanup() {
System.out.println("★ -- Word Counter ["+ name + "-"+id +"]");
for(Map.Entry<String, Integer> entry:counters.entrySet()){
System.out.println("★ -- Word Counter \t"+entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
CharacterCounterBolt.java
package storm.starter;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
public class CharacterCounterBolt implements IRichBolt{
Integer id;
String name;
long characters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
long size = str.length();
System.out.println("str=["+ str +"] Size="+size);
characters = characters + size;
collector.ack(input);
}
@Override
public void cleanup() {
System.out.println("★ -- Character Counter ["+characters +"]");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
5.ダウンロード
◎ HelloStorm.java >>Hellostorm.java
◎ LineReaderSpout.java >>LineReaderSpout.java
◎ LineCounterBolt.java >>wordsplitterbolt.java
◎ WordSplitterBolt.java >>WordSplitterBolt.java
◎ WordCounterBolt >>WordCounterBolt.java<
◎ CharacterCounterBolt.java >>CharacterCounterBolt.java
◎ test.txt >>test.txt
◎ 出力ログ >>8fo97cd838d83o.log