一、发布订阅消息系统 2016-09-20
1) zookeeper集群
启动3台服务器
sudo bin/zookeeper-server-start.sh config/zookeeper.properties >> /tmp/zookeeper-startup.log & sudo bin/zookeeper-server-start.sh config/zookeeper-1.properties >> /tmp/zookeeper-startup-1.log & sudo bin/zookeeper-server-start.sh config/zookeeper-2.properties >> /tmp/zookeeper-startup-2.log &
监听端口
127.0.0.1:2181 127.0.0.1:2182 127.0.0.1:2183
实际执行的命令
bin/kafka-run-class.sh -name zookeeper -loggc org.apache.zookeeper.server.quorum.QuorumPeerMain config/zookeeper-1.properties
查杀进程
ps aux|grep zookeeper-server-start sudo kill -s TERM pid
本地Java环境目录
/usr/lib/jvm/default-java
2)kafka集群
启动3台kafka broker
sudo env JMX_PORT=10000 bin/kafka-server-start.sh config/server.properties >> /tmp/kafka-startup.log & sudo env JMX_PORT=10001 bin/kafka-server-start.sh config/server-1.properties >> /tmp/kafka-startup-1.log & sudo env JMX_PORT=10002 bin/kafka-server-start.sh config/server-2.properties >> /tmp/kafka-startup-2.log &
ps aux|grep kafka-server-start.sh
监听端口
PLAINTEXT://127.0.0.1:9092 PLAINTEXT://127.0.0.1:9093 PLAINTEXT://127.0.0.1:9094
查看topic信息
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test
查看offset最小值
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic test --time -2
查看offset最大值
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic test --time -1
重设topic offset(将offset记录到本地或broker)
$topic->consumeStart($partition, $offset);
重设topic offset(将offset记录到zookeeper为此方式)
b) set /consumers/myConsumerGroup9/offsets/test/1 366380
3)Kafka Connect从文件导入数据流
启动Connect服务
sudo bin/connect-distributed.sh config/connect-distributed.properties
支持的Connector
org.apache.kafka.connect.source.SourceConnector, org.apache.kafka.connect.sink.SinkConnector, org.apache.kafka.connect.tools.VerifiableSinkConnector, org.apache.kafka.connect.file.FileStreamSourceConnector, org.apache.kafka.connect.file.FileStreamSinkConnector, org.apache.kafka.connect.tools.VerifiableSourceConnector
4)kafka性能测试
测试Kafka Producer的性能
bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --record-size=51200 --producer-props bootstrap.servers=127.0.0.1:9092 --throughput 1000
测试Kafka Consumer的性能
bin/kafka-consumer-perf-test.sh --topic test --messages 100000 --zookeeper 127.0.0.1:2181 --broker-list 127.0.0.1:9092 --group myConsumerGroup --threads 2
Kafka Manager管理
安装sbt
# Ubuntu echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823 sudo apt-get update sudo apt-get install sbt # Centos curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo sudo yum install sbt sudo sbt sbt-version -DsocksProxyHost=127.0.0.1 -DsocksProxyPort=1080
下载编译manager
git clone https://github.com/yahoo/kafka-manager.git cd kafka-manager sbt clean dist
修改配置文件
编译成功后,会在target/universal下生成一个zip包,解压并修改配置文件。
unzip kafka-manager-1.3.0.4.zip vim kafka-manager-1.3.0.4/conf/application.conf
启动manager
sudo bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port 9001 & sudo bin/kafka-manager -Dkafka-manager.zkhosts="127.0.0.1:2181" &
默认Web UI地址:http://172.28.128.3:9000/