博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
PHP下kafka的常用脚本实践
阅读量:6275 次
发布时间:2019-06-22

本文共 8530 字,大约阅读时间需要 28 分钟。

阅读本教程前最好先尝试阅读:

自带命令实践

尝试实践的kafka知识:

  1. 创建话题
  2. 生产消息
  3. 消费消息
  4. 话题信息
  5. 获取消费组
  6. 获取消费组的offset

自带的命令

# kafka安装目录的bin目录下# * 代表我们会使用的脚本connect-distributed.sh        kafka-log-dirs.sh                    kafka-streams-application-reset.shconnect-standalone.sh         kafka-mirror-maker.sh                kafka-topics.sh*kafka-acls.sh                 kafka-preferred-replica-election.sh  kafka-verifiable-consumer.shkafka-broker-api-versions.sh  kafka-producer-perf-test.sh          kafka-verifiable-producer.shkafka-configs.sh              kafka-reassign-partitions.sh         trogdor.shkafka-console-consumer.sh*    kafka-replay-log-producer.sh         windowskafka-console-producer.sh*    kafka-replica-verification.sh        zookeeper-security-migration.shkafka-consumer-groups.sh*     kafka-run-class.sh                   zookeeper-server-start.shkafka-consumer-perf-test.sh   kafka-server-start.sh                zookeeper-server-stop.shkafka-delegation-tokens.sh    kafka-server-stop.sh                 zookeeper-shell.sh*kafka-delete-records.sh       kafka-simple-consumer-shell.sh

创建话题(kafka-topics.sh)

# 创建1个分区1个副本的test话题,这里是副本其实可以理解为broker里至少拥有数量,must >=1# --zookeeper localhost:2181 (kafka的默认端口:2181)bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test# 创建2个分区1个副本的test02话题bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test02# 列出所有话题bin/kafka-topics.sh --list --zookeeper localhost:2181__consumer_offsetstesttest02# 注意这里的__consumer_offsets是kafka默认创建的,用于存储kafka消费记录的话题,我们暂时不用理会# 列出具体话题的信息bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic:test      PartitionCount:1        ReplicationFactor:1     Configs:        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test02Topic:test02    PartitionCount:2        ReplicationFactor:1     Configs:        Topic: test02   Partition: 0    Leader: 0       Replicas: 0     Isr: 0        Topic: test02   Partition: 1    Leader: 0       Replicas: 0     Isr: 0# 从上面的显示 我们可以发现,第一句是显示总体信息,下面缩进显示的是分区信息

消费者(kafka-console-consumer.sh)

# 启动一个消费组消费,这里我们需要开启一个shell终端,因为会等待输出bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning# 等待输出

生产者(kafka-console-consumer.sh)

# 这里我们需要开启一个shell终端,因为会等待输入bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test# 待我们输入消息 等待出现>> msg01> msg02> msg03#注意观察上面的消费者终端,自动输出了我们的消息msg01msg02msg03

查看消费组

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --listNote: This will not show information about old Zookeeper-based consumers.console-consumer-25379console-consumer-73410console-consumer-27127console-consumer-61887console-consumer-61324# 这里我们再来起一个消费者再次输出(因为之前的消费者我们不知道最新的这次消费组id是多少)bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --listNote: This will not show information about old Zookeeper-based consumers.console-consumer-25379console-consumer-73410console-consumer-27127console-consumer-39416 # 这个是我们新起的消费组id,下面我们根据这个消费组来做实践console-consumer-61887console-consumer-61324# 查看消费组的具体信息bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-39416Note: This will not show information about old Zookeeper-based consumers.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-IDtest            0          9               9               0               consumer-1-94afec29-5042-4108-8619-ba94812f10a8 /127.0.0.1      consumer-1# 查看离线的console-consumer-25379消费组bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-25379Note: This will not show information about old Zookeeper-based consumers.Consumer group 'console-consumer-25379' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            0          5               9               4               -               -               -# 查看离线的console-consumer-27127消费组bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-27127Note: This will not show information about old Zookeeper-based consumers.Consumer group 'console-consumer-27127' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            0          6               9               3               -               -               -# 这里我们发现我们每次都会生成一个消费组一个消费者,不方便我们做一个消费组多个消费者测试

启动一个消费组多个消费者

cp config/consumer.properties config/consumer_g1.propertiesvim config/consumer_g1.properties# 修改消费组名group.id=test-consumer-group => group.id=test-g1bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer_g1.propertiesmsg01msg02msg03bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer_g1.properties# 无输出(因为test话题我们只有一个分区,一个分区只能被同个消费组下面的一个消费者消费,所以这个就闲置了)# 查看消费组bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --listNote: This will not show information about old Zookeeper-based consumers.console-consumer-25379console-consumer-73410console-consumer-27127console-consumer-39416test-g1console-consumer-61887console-consumer-61324# 查看test-g1消费组bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-g1Note: This will not show information about old Zookeeper-based consumers.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-IDtest            0          9               9               0               consumer-1-43922b0c-34e0-47fe-b597-984c9e6a2884 /127.0.0.1      consumer-1# 下面我们再开启test02的2个消费组看看情况# 我们再为test02话题启动2个test-g1消费组的消费者bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning --consumer.config config/consumer_g1.propertiesbin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning --consumer.config config/consumer_g1.properties# 查看test-g1消费组bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-g1Note: This will not show information about old Zookeeper-based consumers.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-IDtest02          0          0               0               0               consumer-1-90ccc960-557a-46ab-a799-58c35ee670d8 /127.0.0.1      consumer-1test02          1          0               0               0               consumer-1-c6d79321-8212-4594-8a11-353f684c54fc /127.0.0.1      consumer-1test            0          9               9               0               consumer-1-7a2706f7-a206-4c29-ae1f-3726ad21af96 /127.0.0.1      consumer-1# 这里你可以在话题test产生一个消息看下,然后再test02再多产生几条消息看下,你会发现test02的2个消费组几乎是负载的消费了消息

消费组的一些信息命令

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verboseNote: This will not show information about old Zookeeper-based consumers.CONSUMER-ID                                     HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENTconsumer-1-b8eeb8bc-6aa6-439b-84d8-0fcbed4a2899 /127.0.0.1      consumer-1      1               test02(1)consumer-1-7109f789-a5cf-4862-94d0-976146dbc769 /127.0.0.1      consumer-1      1               test(0)consumer-1-90ccc960-557a-46ab-a799-58c35ee670d8 /127.0.0.1      consumer-1      1               test02(0)bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --stateNote: This will not show information about old Zookeeper-based consumers.COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERSlocalhost:9092 (0)        range                     Stable               3bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group console-consumer-25379

zookeeper(zookeeper-shell.sh)

连接

bin/zookeeper-shell.sh 127.0.0.1:2181Connecting to 127.0.0.1:2181Welcome to ZooKeeper!JLine support is disabledWATCHER::WatchedEvent state:SyncConnected type:None path:null

常用命令

ls /[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

转载地址:http://magpa.baihongyu.com/

你可能感兴趣的文章
Android开发 - 掌握ConstraintLayout(九)分组(Group)
查看>>
springboot+logback日志异步数据库
查看>>
Typescript教程之函数
查看>>
Android 高效安全加载图片
查看>>
vue中数组变动不被监测问题
查看>>
3.31
查看>>
类对象定义 二
查看>>
收费视频网站Netflix:用户到底想要“点”什么?
查看>>
MacOS High Sierra 12 13系统转dmg格式
查看>>
关于再次查看已做的多选题状态逻辑问题
查看>>
动态下拉菜单,非hover
查看>>
政府安全资讯精选 2017年第十六期 工信部发布关于规范互联网信息服务使用域名的通知;俄罗斯拟建立备用DNS;Google打击安卓应用在未经同意情况下收集个人信...
查看>>
简单易懂的谈谈 javascript 中的继承
查看>>
多线程基础知识
查看>>
iOS汇编基础(四)指针和macho文件
查看>>
Laravel 技巧锦集
查看>>
Android 使用 ViewPager+RecyclerView+SmartRefreshLayout 实现顶部图片下拉视差效果
查看>>
Flutter之基础Widget
查看>>
写给0-3岁产品经理的12封信(第08篇)——产品运营能力
查看>>
ArcGIS Engine 符号自动化配置工具实现
查看>>