京东6.18大促主会场领京享红包更优惠

 找回密码
 立即注册

QQ登录

只需一步,快速开始

linux情况kafka安装及配置方式

2024-11-2 23:41| 发布者: 76a9| 查看: 59| 评论: 0

摘要: 目录下载资源安装zookeeperkafka安装及配置kafka安装(单体)kafka集群配置方式kafka开启kerberos认证kafka自带zookeeper利用kafka常用下令总结下载资源 1、linux情况安装kafka,需要预先预备相干资源,我利用的是kaf
目录

下载资源

1、linux情况安装kafka,需要预先预备相干资源,我利用的是kafka_2.12-2.5.1版本,下载路径为:http://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz,也可以通过下令wget http://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz举行资源获取;

2、获取并安装zookeeper:(以apache-zookeeper-3.6.1-bin.tar.gz为例),官网:https://zookeeper.apache.org/。

3、将下载好的kafka及zookeeper压缩包上传到假造机服务器,放置到/usr/local/目录中:

安装zookeeper

1、解压apache-zookeeper-3.6.1-bin.tar.gz压缩包,并重命名

[code]#进入到压缩包存放路径 cd /usr/local/ #解压 tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz #重命名 mv apache-zookeeper-3.6.1-bin zookeeper-3.6.1[/code]

2、配置启动

[code]#进入配置目录 cd zookeeper-3.6.1/conf/ #复制配置文件,不直接修改源文件用于备份利用 cp zoo_sample.cfg zoo.cfg #编辑zoo.cfg文件 vi zoo.cfg[/code]

修改zoo.cfg中的内容为,主要修改dataDir路径以及端口号:

[code]# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/usr/local/zookeeper-3.6.1/data/ # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 ## Metrics Providers # # https://prometheus.io Metrics Exporter #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider #metricsProvider.httpPort=7000 #metricsProvider.exportJvmInfo=true [/code]

3、启动zookeeper

[code]sh /usr/local/zookeeper-3.6.1/bin/zkServer.sh start [/code]

4、查察zookeeper状态

[code]sh /usr/local/zookeeper-3.6.1/bin/zkServer.sh status[/code]

另:制止zookeeper:

[code]sh /usr/local/zookeeper-3.6.1/bin/zkServer.sh stop[/code]

以上为单体zookeeper安装及启动过程,以下为zookeeper集群的搭建方式:

与单体的搭建方式类似,只需要重复类似单体的部署模式,集群最少利用3台,以下大概先容相应的部署方式:

1、首先将解压好的文件夹复制并重命名

[code]#创建集群目录 mkdir zookeeper-cluster #将解压好的zookeeper目录复制到集群目录中 cp -r zookeeper-3.6.1 zookeeper-cluster/ #进入集群目录中 cd zookeeper-cluster/ #重命名复制过来的目录为节点1目录,同理复制出节点2和节点3的目录 mv zookeeper-3.6.1 zookeeper-1 cp -r zookeeper-1 ./zookeeper-2 cp -r zookeeper-1 ./zookeeper-3[/code]

以下为复制完成后的目录结构:

2、分别配置相干的配置文件,注意节点名称及dataDir路径不要冲突

[code]vi zookeeper-1/conf/zoo.cfg vi zookeeper-2/conf/zoo.cfg vi zookeeper-3/conf/zoo.cfg[/code]

3、分别创建myid文件到data目录中

[code]echo "1" > zookeeper-1/data/myid echo "2" > zookeeper-2/data/myid echo "3" > zookeeper-3/data/myid[/code]

修改每个节点中的dataDir,clientPort的值,并增长节点之间的关联属性,以下是节点1的示例,其他节点以此类推:

配置完成后分别启动三个节点:

[code]/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start /usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start[/code]

集群启动成功

毗连测试,zkCli.sh脚本可以毗连集群测试是否启动成功:

假如(kerberos服务已经安装并配置完成)开启Kerberos认证需要举行以下操纵(以单体zookeeper为例):

1、生成keytab文件:

[code]#登录kerberos的下令行界面 kadmin.local #生成随机暗码 addprinc -randkey zookeeper/hadoop.test.com@TEST.COM #生成keytab文件 ktadd -k /etc/security/keytabs/zookeeper.keytab zookeeper/hadoop.test.com@TEST.COM #退出下令行 exit #查察生成的keytab文件的用户 klist -ket /etc/security/keytabs/zookeeper.keytab[/code]

以下是实行过程示例:

2、生成jaas文件:

[code]vi /usr/local/zookeeper-3.6.1/conf/jaas.conf[/code]

jaas.conf文件内容,注意keyTab属性位置及principal用户名配置正确

[code]Server{ com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/zookeeper.keytab" principal="zookeeper/hadoop.test.com@TEST.COM" useTicketCache=false; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/zookeeper.keytab" storeKey=true useTicketCache=false principal="zookeeper/hadoop.test.com@TEST.COM"; }; [/code]

3、修改配置文件zoo.cfg添加配置,在结尾加入以下配置:

[code]authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000[/code]

4、添加java.env文件,并写入相干的内容,注意-Djava.security.auth.login.config为生成的jaas.conf文件的路径:

[code]echo 'export JVMFLAGS=" -Dsun.security.krb5.debug=true -Djava.security.auth.login.config=/usr/local/zookeeper-3.6.1/conf/jaas.conf"' > /usr/local/zookeeper-3.6.1/conf/java.env[/code]

5、启动验证,启动成功,并毗连成功

kafka安装及配置

kafka安装(单体)

1、解压安装包

[code]#进入压缩包存放路径 cd /usr/local/ #解压压缩包 tar -zxvf kafka_2.12-2.5.1.tgz #进入解压后目录中 cd kafka_2.12-2.5.1[/code]

2、修改配置

[code]#修改服务配置文件 vi /usr/local/kafka_2.12-2.5.1/config/server.properties[/code]

配置中的内容需要配置:

[code]listeners=PLAINTEXT://192.168.4.130:9092 zookeeper.connect=192.168.4.130:2181[/code]

配置完成后(前提:zookeeper已正常启动),即可启动

服务端启动下令(不加-daemon前台启动,关闭即制止服务,加了-daemon配景启动):

[code]#启动下令 /usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/server.properties #制止下令 /usr/local/kafka_2.12-2.5.1/bin/zookeeper-server-stop.sh[/code]

查察服务日志下令:

[code]tail -f /usr/local/kafka_2.12-2.5.1/logs/server.log[/code]

以上是kafka单体的配置及启动方式。

kafka集群配置方式

本次主要记载同一台服务器上搭建集群,类似于单体的搭建方式,我们只需要配置每个节点的配置文件,然后分别启动即可,假如是分差异的服务器搭建,类似于每台服务器上搭建单体,然后在配置文件中增长相应集群相干的配置项即可利用了,以下默认已经将压缩包解压好后的操纵。

1、复制server.properties文件并重命名,复制出3(集群最好大于等于三个节点)份来

[code]#复制配置文件 cp /usr/local/kafka_2.12-2.5.1/config/server.properties /usr/local/kafka_2.12-2.5.1/config/server-1.properties cp /usr/local/kafka_2.12-2.5.1/config/server.properties /usr/local/kafka_2.12-2.5.1/config/server-2.properties cp /usr/local/kafka_2.12-2.5.1/config/server.properties /usr/local/kafka_2.12-2.5.1/config/server-3.properties #编辑配置文件 vi /usr/local/kafka_2.12-2.5.1/config/server-1.properties vi /usr/local/kafka_2.12-2.5.1/config/server-2.properties vi /usr/local/kafka_2.12-2.5.1/config/server-3.properties[/code]

之后分别配置复制出来的server-1.properties,server-2.properties,server-3.properties三个配置文件,分别配置以下配置

[code]#节点id,差异的节点用差异的数字表现 broker.id=1 #对外的ip及端口,端口号每个文件不要用同一个,我利用的分别是9091,9092,9093 listeners=PLAINTEXT://192.168.4.130:9091 #数据存放位置,每个节点一个如/kafka-logs-1,/kafka-logs-2,/kafka-logs-3等,差异节点利用文件不可重复,假如重复了轻易启动失败 log.dirs=/usr/local/kafka_2.12-2.5.1/data/cluster/kafka-logs-1 #填zookeeper的地址,多个用,隔开 zookeeper.connect=192.168.4.130:2181[/code]

分别配置好之后可以利用启动下令分别启动相干节点

[code]#启动节点1 /usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/server-1.properties #启动节点2 /usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/server-2.properties #启动节点3 /usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/server-3.properties #查察服务日志 tail -f /usr/local/kafka_2.12-2.5.1/logs/server.log[/code]

启动成功日志:

注:有时存在kafka启动失败报zookeeper毗连超时拒绝毗连时,可能引起的原因是防火墙没有关闭,关闭防火墙的下令,也有可能是/etc/hosts文件配置不对引起的,遇到该题目可以多向思量。

[code]#方式一 #制止防火墙 service firewalld stop #禁用防火墙 systemctl disable firewalld #方式二: chkconfig iptables off[/code]

另:假如之前启动过zookeeper或者kafka,但是数据目录没有清除过的话也会影响我们的启动,一定要细致核对好。

kafka开启kerberos认证

假如我们搭建的kafka(单体或集群)需要开启kerberos认证,可以在安装的时间这样配置:

1、生成keytab文件:

[code]#登录kerberos的下令行界面 kadmin.local #生成随机暗码 addprinc -randkey kafka/hadoop.test.com@TEST.COM #生成keytab文件 ktadd -k /etc/security/keytabs/kafka.keytab kafka/hadoop.test.com@TEST.COM #退出下令行 exit #查察生成的keytab文件的用户 klist -ket /etc/security/keytabs/kafka.keytab[/code]

以下是生成过程示例:

2、生成jaas文件:

[code]vi /usr/local/kafka_2.12-2.5.1/config/jaas.conf[/code]

jaas.conf文件内容,注意keyTab属性位置及principal用户名配置正确

[code]KafkaServer{ com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="/etc/security/keytabs/kafka.keytab" principal="kafka/hadoop.test.com@TEST.COM"; }; KafkaClient{ com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="/etc/security/keytabs/kafka.keytab" principal="kafka/hadoop.test.com@TEST.COM" userTicketCache=true; }; Client{ com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="/etc/security/keytabs/kafka.keytab" principal="kafka/hadoop.test.com@TEST.COM" userTicketCache=true; };[/code]

3、修改kafka的配置文件,假如是单体仅需要修改一个,假如是集群,则需要修改每个节点对应的配置文件:

[code]vi /usr/local/kafka_2.12-2.5.1/config/server.properties[/code]

配置文件中添加(配置)以下的属性(非kerberos配置时需要的配置默认需要配置好)

[code]listeners=SASL_PLAINTEXT://172.168.4.130:9093 advertised.listeners=SASL_PLAINTEXT://172.168.4.130:9093 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=GSSAPI sasl.enabled.mechanisms=GSSAPI sasl.kerberos.service.name=kafka kafka.security.protocol=SASL_PLAINTEXT super.users=User:kafka[/code]

4、修改kafka服务启动脚本,配置相干的jaas文件路径

[code]vi /usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh[/code]

添加以下的内容:

[code]export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/local/kafka_2.12-2.5.1/config/jaas.conf"[/code]

添加示例:

注意:假如kafka毗连时,生产者或消费者毗连开启kerberos认证的kafka服务器时,需要在相应的脚本中也同样添加该配置

[code]#生产者毗连脚本配置 vi /usr/local/kafka_2.12-2.5.1/bin/kafka-console-producer.sh #消费者毗连脚本配置 vi /usr/local/kafka_2.12-2.5.1/bin/kafka-console-consumer.sh #topic毗连脚本配置 vi /usr/local/kafka_2.12-2.5.1/bin/kafka-topics.sh[/code]

同时消费者或生产者的配置文件中需要增长以下配置:

[code]security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka[/code]

添加示例(以生产者为例):

启动kafka服务,毗连并利用,参考平凡kafka启动及毗连相干下令。

kafka自带zookeeper利用

注:kafka安装包中也自带了zookeeper,假如不想安装zookeeper,可以利用kafka安装包中自带的zookeeper。

假如利用kafka安装包中带的zookeeper,需要配置解压后目录中的zookeeper.properties

[code]vi /usr/local/kafka_2.12-2.5.1/config/zookeeper.properties[/code]

假如不需要修改端口可以默认不修改,利用下令启动自带zookeeper:

[code]#启动zookeeper /usr/local/kafka_2.12-2.5.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/zookeeper.properties #制止zookeeper /usr/local/kafka_2.12-2.5.1/bin/zookeeper-server-stop.sh[/code]

假如zookeeper需要开启kerberos认证需要给zookeeper.properties添加配置:

[code]authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000[/code]

在启动脚本zookeeper-server-start.sh中添加如下配置,注意jaas.conf文件的路径,jaas文件生成方式同上。

[code]export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/local/zookeeper-3.6.1/conf/jaas.conf"[/code]

启动下令如上。

kafka常用下令

[code]#创建主题 /usr/local/kafka_2.12-2.5.1/bin/kafka-topics.sh --create --zookeeper 192.168.4.130:2181 --replication-factor 1 --partitions 3 --topic test01 #查察主题列表 /usr/local/kafka_2.12-2.5.1/bin/kafka-topics.sh --list --zookeeper 192.168.4.130:2181/kafka #生产者毗连并生产数据 /usr/local/kafka_2.12-2.5.1/bin/kafka-console-producer.sh --broker-list 192.168.4.130:9092 --topic test01 --producer.config /usr/local/kafka_2.12-2.5.1/config/producer.properties #消费者毗连并消费数据 /usr/local/kafka_2.12-2.5.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.4.130:9092 --topic test01 --from-beginning --consumer.config /usr/local/kafka_2.12-2.5.1/config/consumer.properties #kafka添加消息写入partition时间戳的方法 #Kafka消息的时间戳,在消息中增长了一个时间戳字段和时间戳范例。目前支持的时间戳范例有两种: CreateTime 和 LogAppendTime 前者表现producer创建这条消息的时间;后者表现broker吸收到这条消息的时间(严格来说,是leader broker将这条消息写入到log的时间) /usr/local/kafka_2.12-2.5.1/bin/kafka-topics.sh --alter --topic test01 --zookeeper 192.168.4.130:2181 --config message.timestamp.type=LogAppendTime /usr/local/kafka_2.12-2.5.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.4.130:9092 --topic test01 --from-beginning --consumer.config /usr/local/kafka_2.12-2.5.1/config/consumer.properties --property print.timestamp=true #删除主题内的消息:kafka-delete-records --bootstrap-server <broker-host:port> --offset-json-file offsets.json #–bootstrap-server:需要毗连的 brokers 地址; #–offset-json-file:包罗删除配置的 Json 文件。 /usr/local/kafka_2.12-2.5.1/bin/kafka-delete-records.sh --bootstrap-server 192.168.4.130:9092 --offset-json-file /usr/local/kafka_2.12-2.5.1/remove.json #删除附加:移除kerberos开启的server中的数据,同样需要在相干配置文件中配置kerberos相干的配置,以及脚本中增长相干的配置 /usr/local/kafka_2.12-2.5.1/bin/kafka-delete-records.sh --bootstrap-server 192.168.4.130:9092 --command-config /usr/local/kafka_2.12-2.5.1/config/delete-kerb.properties --offset-json-file /usr/local/kafka_2.12-2.5.1/remove.json[/code]

关于移除数据的remove.json配置文件内容:

[code]{ "partitions": [ {"topic": "test01", "partition": 0, "offset": -1} ], "version":1 }[/code]
  • topic:待删除数据主题
  • partition:待删除的分区
  • offset:删除起始偏移量,设置为 -1,表现将删除主题中所有数据。

总结

以上为个人经验,盼望能给各人一个参考,也盼望各人多多支持脚本之家。


来源:https://www.jb51.net/server/32835274g.htm
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
关闭

站长推荐上一条 /6 下一条

QQ|手机版|小黑屋|梦想之都-俊月星空 ( 粤ICP备18056059号 )|网站地图

GMT+8, 2025-8-16 02:01 , Processed in 0.035463 second(s), 18 queries .

Powered by Mxzdjyxk! X3.5

© 2001-2025 Discuz! Team.

返回顶部