ApacheKafka概要と動作確認
ApacheKafka概要と動作確認方法を説明する。
目次
構築手順
1.概要
Apache Kafkaは、元々は、Linked-In が開発したPublisher/Subscriber方式の分散メッセージングシステムである。
メッセージの仲介を成すブローカーがKafkaの中枢であり、Apache ZooKeeperで動作するため、クラスタ化により耐障害性を実現している。
メッセージはディスクにファイルとして保存し、ZooKeeperクラスタ内でレプリカを作成するため、データ損失を防ぐ仕組みとなっている。
Producerはブローカーに対してメッセージを送信することで、
ブローカー内のTopicと呼ばれるキューにメッセージが蓄積される。
ConsumerはブローカーからPolling方式でメッセージを取得する為、
Consumerの処理能力(並列度、スループット)に応じて、メッセージを取得でき、
ブローカーがデータ転送量などを意識する必要が無い。
Producerの送信するメッセージが、Consumerの処理能力を超えた場合は、
ブローカーがタスクキューとなりメッセージが蓄積する。
2.開発構築
Scalaのインストール
[root@Alabaster ~]# cd /tmp
[root@Alabaster tmp]# ls
scala-2.12.4.tgz
[root@Alabaster tmp]# tar xvzf scala-2.12.4.tgz
scala-2.12.4/
scala-2.12.4/lib/
scala-2.12.4/lib/scala-swing_2.12-2.0.0.jar
scala-2.12.4/lib/scala-reflect.jar
scala-2.12.4/lib/scala-parser-combinators_2.12-1.0.6.jar
scala-2.12.4/lib/scala-compiler.jar
scala-2.12.4/lib/scala-library.jar
scala-2.12.4/lib/scala-xml_2.12-1.0.6.jar
scala-2.12.4/lib/scalap-2.12.4.jar
scala-2.12.4/lib/jline-2.14.5.jar
scala-2.12.4/bin/
scala-2.12.4/bin/scala
scala-2.12.4/bin/scalac.bat
scala-2.12.4/bin/scala.bat
scala-2.12.4/bin/scalap
scala-2.12.4/bin/scalap.bat
scala-2.12.4/bin/scaladoc.bat
scala-2.12.4/bin/fsc
scala-2.12.4/bin/fsc.bat
scala-2.12.4/bin/scalac
scala-2.12.4/bin/scaladoc
scala-2.12.4/man/
scala-2.12.4/man/man1/
scala-2.12.4/man/man1/scalac.1
scala-2.12.4/man/man1/scaladoc.1
scala-2.12.4/man/man1/fsc.1
scala-2.12.4/man/man1/scala.1
scala-2.12.4/man/man1/scalap.1
scala-2.12.4/doc/
scala-2.12.4/doc/tools/
scala-2.12.4/doc/tools/scala.html
scala-2.12.4/doc/tools/index.html
scala-2.12.4/doc/tools/images/
scala-2.12.4/doc/tools/images/scala_logo.png
scala-2.12.4/doc/tools/images/external.gif
scala-2.12.4/doc/tools/fsc.html
scala-2.12.4/doc/tools/scalac.html
scala-2.12.4/doc/tools/css/
scala-2.12.4/doc/tools/css/style.css
scala-2.12.4/doc/tools/scalap.html
scala-2.12.4/doc/tools/scaladoc.html
scala-2.12.4/doc/License.rtf
scala-2.12.4/doc/licenses/
scala-2.12.4/doc/licenses/bsd_asm.txt
scala-2.12.4/doc/licenses/mit_jquery.txt
scala-2.12.4/doc/licenses/bsd_jline.txt
scala-2.12.4/doc/licenses/mit_sizzle.txt
scala-2.12.4/doc/licenses/mit_tools.tooltip.txt
scala-2.12.4/doc/licenses/apache_jansi.txt
scala-2.12.4/doc/LICENSE.md
scala-2.12.4/doc/README
[root@Alabaster tmp]# mv scala-2.12.4 /usr/local/scala-2.12.4
[root@Alabaster local]# ln -s /usr/local/scala-2.12.4 /usr/local/scala
[root@Alabaster local]# cd /usr/local/scala
[root@Alabaster scala]# cd bin
[root@Alabaster bin]# ls
fsc fsc.bat scala scala.bat scalac scalac.bat scaladoc scaladoc.bat scalap scalap.bat
[root@Alabaster bin]# ./scala -version
Scala code runner version 2.12.4 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.
ApacheKafkaのインストール
ApacheKafkaのバイナリをダウンロードする。 ※◦Scalaのバージョンによって異なる
◎ ApacheKafka公式サイト
◎ Apache公式サイト
[root@Alabaster tmp]# tar xvzf kafka_2.12-0.11.0.1.gz
kafka_2.12-0.11.0.1/
kafka_2.12-0.11.0.1/LICENSE
kafka_2.12-0.11.0.1/NOTICE
kafka_2.12-0.11.0.1/bin/
kafka_2.12-0.11.0.1/bin/connect-distributed.sh
kafka_2.12-0.11.0.1/bin/connect-standalone.sh
kafka_2.12-0.11.0.1/bin/kafka-acls.sh
kafka_2.12-0.11.0.1/bin/kafka-broker-api-versions.sh
kafka_2.12-0.11.0.1/bin/kafka-configs.sh
kafka_2.12-0.11.0.1/bin/kafka-console-consumer.sh
kafka_2.12-0.11.0.1/bin/kafka-console-producer.sh
kafka_2.12-0.11.0.1/bin/kafka-consumer-groups.sh
kafka_2.12-0.11.0.1/bin/kafka-consumer-offset-checker.sh
kafka_2.12-0.11.0.1/bin/kafka-consumer-perf-test.sh
kafka_2.12-0.11.0.1/bin/kafka-delete-records.sh
kafka_2.12-0.11.0.1/bin/kafka-mirror-maker.sh
kafka_2.12-0.11.0.1/bin/kafka-preferred-replica-election.sh
kafka_2.12-0.11.0.1/bin/kafka-producer-perf-test.sh
kafka_2.12-0.11.0.1/bin/kafka-reassign-partitions.sh
kafka_2.12-0.11.0.1/bin/kafka-replay-log-producer.sh
kafka_2.12-0.11.0.1/bin/kafka-replica-verification.sh
kafka_2.12-0.11.0.1/bin/kafka-run-class.sh
kafka_2.12-0.11.0.1/bin/kafka-server-start.sh
kafka_2.12-0.11.0.1/bin/kafka-server-stop.sh
kafka_2.12-0.11.0.1/bin/kafka-simple-consumer-shell.sh
kafka_2.12-0.11.0.1/bin/kafka-streams-application-reset.sh
kafka_2.12-0.11.0.1/bin/kafka-topics.sh
kafka_2.12-0.11.0.1/bin/kafka-verifiable-consumer.sh
kafka_2.12-0.11.0.1/bin/kafka-verifiable-producer.sh
kafka_2.12-0.11.0.1/bin/windows/
kafka_2.12-0.11.0.1/bin/windows/connect-distributed.bat
kafka_2.12-0.11.0.1/bin/windows/connect-standalone.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-acls.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-broker-api-versions.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-configs.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-console-consumer.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-console-producer.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-consumer-groups.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-consumer-offset-checker.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-consumer-perf-test.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-mirror-maker.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-preferred-replica-election.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-producer-perf-test.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-reassign-partitions.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-replay-log-producer.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-replica-verification.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-run-class.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-server-start.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-server-stop.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-simple-consumer-shell.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-topics.bat
kafka_2.12-0.11.0.1/bin/windows/zookeeper-server-start.bat
kafka_2.12-0.11.0.1/bin/windows/zookeeper-server-stop.bat
kafka_2.12-0.11.0.1/bin/windows/zookeeper-shell.bat
kafka_2.12-0.11.0.1/bin/zookeeper-security-migration.sh
kafka_2.12-0.11.0.1/bin/zookeeper-server-start.sh
kafka_2.12-0.11.0.1/bin/zookeeper-server-stop.sh
kafka_2.12-0.11.0.1/bin/zookeeper-shell.sh
kafka_2.12-0.11.0.1/config/
kafka_2.12-0.11.0.1/config/connect-console-sink.properties
kafka_2.12-0.11.0.1/config/connect-console-source.properties
kafka_2.12-0.11.0.1/config/connect-distributed.properties
kafka_2.12-0.11.0.1/config/connect-file-sink.properties
kafka_2.12-0.11.0.1/config/connect-file-source.properties
kafka_2.12-0.11.0.1/config/connect-log4j.properties
kafka_2.12-0.11.0.1/config/connect-standalone.properties
kafka_2.12-0.11.0.1/config/consumer.properties
kafka_2.12-0.11.0.1/config/log4j.properties
kafka_2.12-0.11.0.1/config/producer.properties
kafka_2.12-0.11.0.1/config/server.properties
kafka_2.12-0.11.0.1/config/tools-log4j.properties
kafka_2.12-0.11.0.1/config/zookeeper.properties
kafka_2.12-0.11.0.1/libs/
kafka_2.12-0.11.0.1/libs/kafka-clients-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/jopt-simple-5.0.3.jar
kafka_2.12-0.11.0.1/libs/metrics-core-2.2.0.jar
kafka_2.12-0.11.0.1/libs/scala-library-2.12.2.jar
kafka_2.12-0.11.0.1/libs/slf4j-log4j12-1.7.25.jar
kafka_2.12-0.11.0.1/libs/zkclient-0.10.jar
kafka_2.12-0.11.0.1/libs/zookeeper-3.4.10.jar
kafka_2.12-0.11.0.1/libs/scala-parser-combinators_2.12-1.0.4.jar
kafka_2.12-0.11.0.1/libs/lz4-1.3.0.jar
kafka_2.12-0.11.0.1/libs/snappy-java-1.1.2.6.jar
kafka_2.12-0.11.0.1/libs/slf4j-api-1.7.25.jar
kafka_2.12-0.11.0.1/libs/log4j-1.2.17.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1.jar.asc
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-sources.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-sources.jar.asc
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-javadoc.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-javadoc.jar.asc
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-test.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-test.jar.asc
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-test-sources.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-test-sources.jar.asc
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-scaladoc.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-scaladoc.jar.asc
kafka_2.12-0.11.0.1/site-docs/
kafka_2.12-0.11.0.1/site-docs/kafka_2.12-0.11.0.1-site-docs.tgz
kafka_2.12-0.11.0.1/libs/kafka-tools-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/kafka-log4j-appender-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/argparse4j-0.7.0.jar
kafka_2.12-0.11.0.1/libs/jackson-databind-2.8.5.jar
kafka_2.12-0.11.0.1/libs/jackson-annotations-2.8.5.jar
kafka_2.12-0.11.0.1/libs/jackson-core-2.8.5.jar
kafka_2.12-0.11.0.1/libs/connect-api-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/connect-runtime-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/connect-transforms-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/jackson-jaxrs-json-provider-2.8.5.jar
kafka_2.12-0.11.0.1/libs/jersey-container-servlet-2.24.jar
kafka_2.12-0.11.0.1/libs/jetty-server-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/jetty-servlet-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/jetty-servlets-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/reflections-0.9.11.jar
kafka_2.12-0.11.0.1/libs/maven-artifact-3.5.0.jar
kafka_2.12-0.11.0.1/libs/jackson-jaxrs-base-2.8.5.jar
kafka_2.12-0.11.0.1/libs/jackson-module-jaxb-annotations-2.8.5.jar
kafka_2.12-0.11.0.1/libs/jersey-container-servlet-core-2.24.jar
kafka_2.12-0.11.0.1/libs/jersey-common-2.24.jar
kafka_2.12-0.11.0.1/libs/jersey-server-2.24.jar
kafka_2.12-0.11.0.1/libs/javax.ws.rs-api-2.0.1.jar
kafka_2.12-0.11.0.1/libs/javax.servlet-api-3.1.0.jar
kafka_2.12-0.11.0.1/libs/jetty-http-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/jetty-io-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/jetty-security-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/jetty-continuation-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/jetty-util-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/guava-20.0.jar
kafka_2.12-0.11.0.1/libs/javassist-3.21.0-GA.jar
kafka_2.12-0.11.0.1/libs/plexus-utils-3.0.24.jar
kafka_2.12-0.11.0.1/libs/commons-lang3-3.5.jar
kafka_2.12-0.11.0.1/libs/javax.inject-2.5.0-b05.jar
kafka_2.12-0.11.0.1/libs/javax.annotation-api-1.2.jar
kafka_2.12-0.11.0.1/libs/jersey-guava-2.24.jar
kafka_2.12-0.11.0.1/libs/hk2-api-2.5.0-b05.jar
kafka_2.12-0.11.0.1/libs/hk2-locator-2.5.0-b05.jar
kafka_2.12-0.11.0.1/libs/osgi-resource-locator-1.0.1.jar
kafka_2.12-0.11.0.1/libs/jersey-client-2.24.jar
kafka_2.12-0.11.0.1/libs/jersey-media-jaxb-2.24.jar
kafka_2.12-0.11.0.1/libs/validation-api-1.1.0.Final.jar
kafka_2.12-0.11.0.1/libs/hk2-utils-2.5.0-b05.jar
kafka_2.12-0.11.0.1/libs/aopalliance-repackaged-2.5.0-b05.jar
kafka_2.12-0.11.0.1/libs/javax.inject-1.jar
kafka_2.12-0.11.0.1/libs/connect-json-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/connect-file-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/kafka-streams-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/rocksdbjni-5.0.1.jar
kafka_2.12-0.11.0.1/libs/kafka-streams-examples-0.11.0.1.jar
[root@Alabaster tmp]# mv kafka_2.12-0.11.0.1 /usr/local/
[root@Alabaster local]# ln -s /usr/local/kafka_2.12-0.11.0.1 /usr/local/kafka
3.動作確認
Zookeeperの起動
[root@Alabaster bin]# pwd
/usr/local/kafka/bin
[root@Alabaster bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
Kafkaの実行
[root@Alabaster bin]# ./kafka-server-start.sh -daemon ../config/server.properties
Topicの作成(test01と言う名前のTopicを作成)
[root@Alabaster bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test01
Created topic "test01".
[root@Alabaster bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
test01
動作確認(メッセージの送受信:Kafka同梱コマンド)
◎ メッセージの送信
[root@Alabaster bin]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test01
>aaaa
>dddd
>exit
>quit
<Ctrol+C>
◎ メッセージの受信
[root@Alabaster bin]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test01 --from-beginning
aaaa
dddd
exit
quit
^C Processed a total of 4 messages
メッセージの送受信アプリケーション:Java版
Producerからメッセージの投げ込み
[root@Alabaster tmp]# java -cp XXXXXX jp.a_frontier.kafka.sample.SampleProducer ABC 7
[main]Start
msg=[ABC]
count=7
[Start]コンストラクタ
[Start]configure()
[End]configure()
[End]コンストラクタ
msg=ABC
messageCount=7
ProducerRecord[Maked]
i=0
i=1
i=2
i=3
i=4
i=5
i=6
for[End]
flush[End]
elapsedTime=474
close[End]
[main]End
Consumerが当該TOPICからメッセージを取得表示
[root@Alabaster ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test01
ABC0
ABC1
ABC2
ABC3
ABC4
ABC5
ABC6
^CProcessed a total of 7 messages
Consumerでメッセージの取得
[root@Alabaster ~]# java -cp XXXXX jp.a_frontier.kafka.sample.SimpleConsumer
run():Topic=test01
run():subscribe
run():records
Received message: (null, aaaa) at offset 0
Received message: (null, dddd) at offset 1
Received message: (null, exit) at offset 2
Received message: (null, quit) at offset 3
Received message: (null, abcd) at offset 4
Received message: (null, dddd) at offset 5
Received message: (null, dddddd) at offset 6
Received message: (null, fdfe) at offset 7
Received message: (null, sdfsafe) at offset 8
Received message: (null, kfdsajfie) at offset 9
Received message: (null, a) at offset 10
Received message: (null, a) at offset 11
・
・
・
4.参考
[root@Alabaster ~]# /usr/local/kafka/bin/kafka-console-consumer.sh
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
Option Description
------ -----------
'--blacklist <String: blacklist> Blacklist of topics to exclude from
consumption.
'--bootstrap-server <String: server to REQUIRED (unless old consumer is
connect to> used): The server to connect to.
'--consumer-property <String: A mechanism to pass user-defined
consumer_prop> properties in the form key=value to
the consumer.
'--consumer.config <String: config file> Consumer config properties file. Note
that [consumer-property] takes
precedence over this config.
'--csv-reporter-enabled If set, the CSV metrics reporter will
be enabled
'--delete-consumer-offsets If specified, the consumer path in
zookeeper is deleted when starting up
'--enable-systest-events Log lifecycle events of the consumer
in addition to logging consumed
messages. (This is specific for
system tests.)
'--formatter <String: class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
'--from-beginning If the consumer does not already have
an established offset to consume
from, start with the earliest
message present in the log rather
than the latest message.
'--isolation-level <String> Set to read_committed in order to
filter out transactional messages
which are not committed. Set to
read_uncommittedto read all
messages. (default: read_uncommitted)
'--key-deserializer <String:
deserializer for key>
'--max-messages <Integer: num_messages> The maximum number of messages to
consume before exiting. If not set,
consumption is continual.
'--metrics-dir <String: metrics If csv-reporter-enable is set, and
directory> this parameter isset, the csv
metrics will be output here
'--new-consumer Use the new consumer implementation.
This is the default.
'--offset <String: consume offset> The offset id to consume from (a non-
negative number), or 'earliest'
which means from beginning, or
'latest' which means from end
(default: latest)
'--partition <Integer: partition> The partition to consume from.
Consumption starts from the end of
the partition unless '--offset' is
specified.
'--property <String: prop> The properties to initialize the
message formatter.
'--skip-message-on-error If there is an error when processing a
message, skip it instead of halt.
'--timeout-ms <Integer: timeout_ms> If specified, exit if no message is
available for consumption for the
specified interval.
'--topic <String: topic> The topic id to consume on.
'--value-deserializer <String:
deserializer for values>
'--whitelist <String: whitelist> Whitelist of topics to include for
consumption.
'--zookeeper <String: urls> REQUIRED (only when using old
consumer): The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.
5.ソースコード
◎ sampleproducer.java
◎ simpleconsumer.java
6.今後の課題
◎ Trifectaによるメッセージの可視化
◎ Topic分割、クラスタリング単位の最適設計のポイント
◎ Zookeeprの複数台クラスタリングによる動作性能
◎ 運用監視項目
◎ Nginx→Tomcat→Kafka→Storm連携