背景:之前资产信息用网络接口进行数据推送,但是接口推送需要验证而且反应较慢。Kafak中间件提供了另一种可行的数据推送方式,它可以进行消息队列推送,且反应速度快。但是Kafka需部署在公网环境,并进行登录验证,如果部署Kafka后未设置登录验证,会被恶意扫描到,此时向Kafka里面push超过1G的文件,将会直接导致Kafka宕掉。 为了避免Kafka宕掉,建设在使用Kafka过程中配置登录验证,以下是Kafka登录验证的配置过程。 1、修改kafka配置文件: vim /asop/kafka/kafka_2.11-2.1.0/config/server.properties broker.id=0 listeners=SASL_PLAINTEXT://:9092 advertised.listeners=SASL_PLAINTEXT://1.1.1.1:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/asop/kafka/logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 #使用的认证协议 security.inter.broker.protocol=SASL_PLAINTEXT #SASL机制 sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN #完成身份验证的类 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer #如果没有找到ACL(访问控制列表)配置,则允许任何操作。 allow.everyone.if.no.acl.found=false #需要开启设置超级管理员,设置visitor用户为超级管理员 super.users=User:visitor
2、为server创建登录验证文件(可以自主命名文件) 如vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf 文件内容如下 KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username=”visitor” password=”qaz@123″ user_visitor=”qaz@123″; };
4、为consumer和producer创建登录验证文件(可自主命名文件) 如kafka_client_jaas.conf,文件内容如下(如果是程序访问,如springboot访问,可以不配置) vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username=”visitor” password=”qaz@123″; };
5、在consumer.properties和producer.properties里分别加上如下配置 vim /asop/kafka/kafka_2.11-2.1.0/config/consumer.properties vim /asop/kafka/kafka_2.11-2.1.0/config/producer.properties security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
7、分别启动zookeeper和kafka,至此服务端kafka用户登录验证配置完成(先关闭kafka后关闭zookeeper) 关闭服务kafka # /asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties 关闭服务zookeeper-3.4.13 /asop/zk/zookeeper-3.4.13/bin/zkServer.sh stop /asop/zk/zookeeper-3.4.13/conf/zoo.cfg 启动服务zookeeper-3.4.13 /asop/zk/zookeeper-3.4.13/bin/zkServer.sh start /asop/zk/zookeeper-3.4.13/conf/zoo.cfg 启动服务kafka #/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties
以上就是本期的全部内容。大家好,我是乐乐,专注IT运维技术研究与分享,更多运维知识请关注乐维社区,如有运维问题也欢迎到社区提问!