安装部署
1.jar包下载
http://kafka.apache.org/downloads.html
下载好后并上传到服务器中
2. 部署集群
1、解压安装包
> tar -zxvf kafka_2.11-2.4.1.tgz
2、在当前目录下创建logs文件,用于存储Kafa日志
> mkdir logs
3、修改配置文件
> vim /config/ server.properties
更改一下内容
#打开注释
listeners=PLAINTEXT://:9092
#当前服务器IP为192.168.0.1,你需要修改为外网或局域网可以访问到的服务器IP。
advertised.listeners=PLAINTEXT://192.168.0.1:9092
#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
4、配置环境变量
> sudo vi /etc/profile
在配置文件最底下插入
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
刷新配置
> source /etc/profile
启动服务器
依次在192.168.0.1、192.168.0.2、192.168.0.3节点上启动服务
启动 Zookeeper
启动Kafka时必须先启动Zookeeper,否则就会出现注册失败连接超时问题
启动命令:bin/zookeeper-server-start.sh config/zookeeper.properties
其中 Zookeeper主要有3个配置:
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
Zookeeper启动失败
1:Zookeeper启动端口为2181,在防火墙内打开此端口
2:系统内存不足,出现该错误
解决办法:vim bin/zookeeper-server-start.sh
将 export KAFKA_HEAP_OPTS=”-Xmx512M -Xms128M” 参数改小即可
启动Kafka
启动命令:bin/kafka-server-start.sh config/server.properties
Kafka启动失败
1、如果你在启动时出现 java.lang.OutOfMemoryError: Map failed
打开 bin/kafka-run-class.sh 文件,
搜索-XX:+DisableExplicitGC,
将这个参数替换为 -XX:+ExplicitGCInvokesConcurrent。
2、系统内存不足,出现该错误
Java OpenJDK(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x000000c0000000, 4096, 0) failed; error='Cannot allocate memory' (errno=12)
89477798
解决办法: vim bin/kafka-server-start.sh
将 export KAFKA_HEAP_OPTS=”-Xmx512M -Xms128M” 参数改小即可
关闭集群
依次在192.168.0.1、192.168.0.2、192.168.0.3节点上关闭服务
> bin/kafka-server-stop.sh stop
Kafka 命令行操作
1.创建 Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
选项说明:
--topic 定义 topic 名
--replication-factor 定义副本数
--partitions 定义分区数
2.删除 topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
3.查看当前服务器中的所有 topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
4.查看某个 Topic 的详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
在test_2的主题下,分区0的领导是ides为1的Broers,副本节点为1和2。活着已同步的节点为1和2
在test_2的主题下,分区1的领导是ides为2的Broers,副本节点为2和1。活着已同步的节点为2和1
5.修改分区数
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 6
测试集群
1.创建Topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2.启动一个消费者
打开一个新的终端下执行此命令
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
这个消费者就是简单的将消息输出到控制台。
3.启动生产者
执行此命令
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动后,可以输入内容,然后回车。
即可在刚才的新终端下看到有消息输出
酷客网相关文章:
评论前必须登录!
注册