邓莫尔的维基世界

个人日志便签博客 By dume2007@gmail.com

首页 PHP Linux Python 日记 View on GitHub

一、发布订阅消息系统 2016-09-20

1) zookeeper集群

启动3台服务器

sudo bin/zookeeper-server-start.sh config/zookeeper.properties >> /tmp/zookeeper-startup.log &
sudo bin/zookeeper-server-start.sh config/zookeeper-1.properties >> /tmp/zookeeper-startup-1.log &
sudo bin/zookeeper-server-start.sh config/zookeeper-2.properties >> /tmp/zookeeper-startup-2.log &

监听端口

127.0.0.1:2181 127.0.0.1:2182 127.0.0.1:2183

实际执行的命令

bin/kafka-run-class.sh -name zookeeper -loggc org.apache.zookeeper.server.quorum.QuorumPeerMain config/zookeeper-1.properties

查杀进程

ps aux|grep zookeeper-server-start
sudo kill -s TERM pid

本地Java环境目录

/usr/lib/jvm/default-java

2)kafka集群

启动3台kafka broker

sudo env JMX_PORT=10000 bin/kafka-server-start.sh config/server.properties >> /tmp/kafka-startup.log &
sudo env JMX_PORT=10001 bin/kafka-server-start.sh config/server-1.properties >> /tmp/kafka-startup-1.log &
sudo env JMX_PORT=10002 bin/kafka-server-start.sh config/server-2.properties >> /tmp/kafka-startup-2.log &

ps aux|grep kafka-server-start.sh

监听端口

PLAINTEXT://127.0.0.1:9092 PLAINTEXT://127.0.0.1:9093 PLAINTEXT://127.0.0.1:9094

查看topic信息

bin/kafka-topics.sh --describe  --zookeeper 127.0.0.1:2181 --topic test

查看offset最小值

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic test --time -2

查看offset最大值

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic test --time -1

重设topic offset(将offset记录到本地或broker)

$topic->consumeStart($partition, $offset);

重设topic offset(将offset记录到zookeeper为此方式)

b) set /consumers/myConsumerGroup9/offsets/test/1 366380

3)Kafka Connect从文件导入数据流

启动Connect服务

sudo bin/connect-distributed.sh config/connect-distributed.properties

支持的Connector

org.apache.kafka.connect.source.SourceConnector, org.apache.kafka.connect.sink.SinkConnector, org.apache.kafka.connect.tools.VerifiableSinkConnector, 
org.apache.kafka.connect.file.FileStreamSourceConnector, org.apache.kafka.connect.file.FileStreamSinkConnector, 
org.apache.kafka.connect.tools.VerifiableSourceConnector

4)kafka性能测试

测试Kafka Producer的性能

bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --record-size=51200 --producer-props bootstrap.servers=127.0.0.1:9092 --throughput 1000

测试Kafka Consumer的性能

bin/kafka-consumer-perf-test.sh --topic test --messages 100000 --zookeeper 127.0.0.1:2181 --broker-list 127.0.0.1:9092 --group myConsumerGroup --threads 2

Kafka Manager管理

安装sbt

# Ubuntu
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
sudo apt-get update
sudo apt-get install sbt

# Centos
curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
sudo yum install sbt

sudo sbt sbt-version -DsocksProxyHost=127.0.0.1 -DsocksProxyPort=1080

下载编译manager

git clone https://github.com/yahoo/kafka-manager.git
cd kafka-manager
sbt clean dist

修改配置文件

编译成功后,会在target/universal下生成一个zip包,解压并修改配置文件。

unzip kafka-manager-1.3.0.4.zip
vim kafka-manager-1.3.0.4/conf/application.conf 

启动manager

sudo bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port 9001 &
sudo bin/kafka-manager -Dkafka-manager.zkhosts="127.0.0.1:2181" &

默认Web UI地址:http://172.28.128.3:9000/

5)参考资料