Kafka应用Demo:指派分区订阅消息消费

环境准备

 Kafka环境搭建和生产者样例代码与《Kafka应用Demo:按主题订阅消费消息》相同。

消费者代码样例

public class KafkaConsumerService {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);
    private static final String NEO_TOPIC = "elon-topic";

    Properties properties = new Properties();
    private KafkaConsumer consumer = null;

    public KafkaConsumerService() {
        TopicPartition partition0 = new TopicPartition(NEO_TOPIC, 0);
        TopicPartition partition1 = new TopicPartition(NEO_TOPIC, 1);

        properties.put("bootstrap.servers","192.168.5.128:9092");  // 指定 Broker
        properties.put("group.id", "neo2");              // 指定消费组群 ID
        properties.put("max.poll.records", "1");
        properties.put("enable.auto.commit", "false");
        properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象
        properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象

        consumer = new KafkaConsumer<String, String>(properties);
        List<TopicPartition> partitionList = new ArrayList<>();
        partitionList.add(partition1);
        partitionList.add(partition0);
        consumer.assign(partitionList);
        new Thread(this::receiveMessage).start();
    }

    public void receiveMessage() {
        try {
            while (true) {
                synchronized (this) {
                    ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                    LOGGER.info("Fetch record num:{}", records.count());
                    for (ConsumerRecord<String,String> record: records) {
                        String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value());
                        LOGGER.info("Received:" + info);
                        Thread.sleep(10000);
                    }
                    consumer.commitSync();
                }
            }
        } catch (Exception e){

        } finally {
            consumer.close();
        }
    }
}

 样例代码中的consumer.assign(partitionList)绑定了主题下的0号分区和1号分区接收消息。指派分区的方式和按主题订阅的方式不能混用,也就是说一个消费者实例只能选择一种方式订阅。

分析

 如果我们同时启动两个conumer实例,指派订阅相同主题和相同分区的消息。可以看到这两个实例收到了相同的消息,哪怕这两个消费者配置了相同的分组,这一点是与按主题订阅消息不同的。

在这里插入图片描述

 根据官方指导文档的说法,如果使用assign绑定分区订阅消息,不同的消费者实例是相互独立的(编者注:相当于广播消息)。为了避免offset提交导致冲突,应该为不同消费者实例配置不同的分组。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/608614.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

word图片水印

一、word中旧水印如何删除 打开word模板&#xff0c;想要删除旧水印&#xff0c;如下图所示操作&#xff0c;但是旧水印删除不掉。 以为上传新水印图片会替换掉旧水印&#xff0c;结果显示了2个水印&#xff0c;要怎么删除呢&#xff1f; 如下截图所示&#xff0c;双击打开页…

vue+element的表格(el-table)排班情况表(2024-05-09)

vueelement的表格&#xff08;el-table&#xff09;排班情况&#xff0c;增删查改等简单功能 代码&#xff1a; <template><!-- 表格 --><div class"sedules"><el-header><el-date-pickerv-model"monthValue2"type"month…

YOLOv8网络结构介绍

将按照YOLOv8目标检测任务、实例分割任务、关键点检测任务以及旋转目标检测任务的顺序来介绍&#xff0c;主要内容也是在目标检测任务中介绍&#xff0c;其他任务也只是Head层不相同。 1.YOLOv8_det网络结构 首先&#xff0c;YOLOv8网络分成了三部分&#xff0c;分别是主干网络…

制鞋5G智能工厂数字孪生可视化平台,推进行业数字化转型

制鞋5G智能工厂数字孪生可视化平台&#xff0c;推进行业数字化转型。随着科技的飞速发展&#xff0c;5G技术与智能制造的结合正成为推动制鞋行业数字化转型的重要力量。制鞋5G智能工厂数字孪生可视化平台&#xff0c;不仅提高了生产效率&#xff0c;还优化了资源配置&#xff0…

【Linux系统编程】31.pthread_detach、线程属性

目录 pthread_detach 参数pthread 返回值 测试代码1 测试结果 pthread_attr_init 参数attr 返回值 pthread_attr_destroy 参数attr 返回值 pthread_attr_setdetachstate 参数attr 参数detachstate 返回值 测试代码2 测试结果 线程使用注意事项 pthread_deta…

SpringCloud:认识微服务

程序员老茶 &#x1f648;作者简介&#xff1a;练习时长两年半的Java up主 &#x1f649;个人主页&#xff1a;程序员老茶 &#x1f64a; P   S : 点赞是免费的&#xff0c;却可以让写博客的作者开心好久好久&#x1f60e; &#x1f4da;系列专栏&#xff1a;Java全栈&#…

NSSCTF | [SWPUCTF 2021 新生赛]easy_sql

打开题目&#xff0c;提示输入一些东西&#xff0c;很显眼的可以看到网站标题为“参数是wllm” 首先单引号判断闭合方式 ?wllm1 报错了&#xff0c;可以判断为单引号闭合。 然后判断字节数&#xff08;注意‘--’后面的空格&#xff09; ?wllm1 order by 3-- 接着输入4就…

[Linux][网络][网络层][IP协议]详细讲解

目录 0.基本概念1.IP协议头格式2.IP分片与组装1.为什么要分片&#xff1f;2.分片后谁来组装&#xff1f;3.这个分片操作传输层知道吗&#xff1f;4.如何识别报文和报文的不同&#xff1f;5.接收端&#xff0c;如何得知报文是独立的还是一个分片&#xff1f;6.如何区别哪些分片是…

UDP和TCP协议比较,TOE技术

如今在某些方面TCP超越UDP的主要原因如下 在硬件层面的TOE(TCP Offload Engine)功能&#xff0c;将越来越多的TCP功能卸载到网卡上。它极大地提升了TCP的性能&#xff0c;使其在高吞吐量场景下的表现更为出色。近年TCP的拥塞控制算法实现了显著进步。这些新算法显著提高了TCP在…

macos安装mysql一直卡在安装成功那个页面选项的解决办法

问题描述&#xff1a; 我安装的是比较新的版本8.0.37&#xff0c;安装过程中一直卡在安装那个选项上&#xff0c;且页面提示安装成功了&#xff0c;但就是死活不往下面的配置选项那一步走。 解决办法&#xff1a; 1.首先清理掉之前的mysql sudo rm -rf /usr/local/mysql2.然…

软件技术主要学什么课程

软件技术专业主要学习的课程和内容有编程语言、数据结构与算法、数据库技术等&#xff0c;以下是上大学网( www.sdaxue.com)整理的软件技术主要学什么课程&#xff0c;供大家参考&#xff01; 编程语言&#xff1a;掌握一种或多种编程语言&#xff0c;如C#、Java、Python、C等&…

Python 2.x与Python 3.x:初学者该如何选择?

自从Python在1994年首次发布以来,已经经历了多个版本的更新和改进。Python 1.x虽然在发展史上具有重要意义,但早已过时,不再用于实际开发。2000年发布的Python 2.x和2008年发布的Python 3.x则成为了Python家族中最常用的两个版本,形成了一个重要的分界线。特别是Python 3.x…

GPU通用计算介绍

谈到 GPU &#xff08;Graphics Processing Unit&#xff0c;图形显示卡&#xff09;大多数人想到的是游戏、图形渲染等这些词汇&#xff0c;图形处理确实是 GPU 的一大应用场景。然而人们也早已关注到它在通用计算上的巨大潜力&#xff0c;并提出了 GPGPU (General-purpose co…

本地搭建hydra服务用go以验证oidc流程

目录 1、docker搭建hydra&#xff0c;环境配置&#xff1a; 2、搭建完成后服务调用&#xff1a; 2.1保证服务正常启动&#xff1a; 2.2 通过postman调用&#xff0c;获取client_id&#xff1a; 2.3 通过client_id&#xff0c;实现oauth2/auth调用 3. 通过go语言实现oidc验…

jsp 实验12 servlet

一、实验目的 掌握怎样在JSP中使用javabean 二、实验项目内容&#xff08;实验题目&#xff09; 编写代码&#xff0c;掌握servlet的用法。【参考课本 上机实验1 】 三、源代码以及执行结果截图&#xff1a; 源代碼&#xff1a; inputVertex.jsp&#xff1a; <% page lang…

免费思维13招之四:主副型思维

免费思维13招之四:主副型思维 本节,给你分享一下产品型思维的第二种子思维:主副型思维 什么是主副型思维呢?传统的主副型思维是指对企业的核心、利润最高的产品进行收费,一些附加品、延伸产品进行让利,赠送给客户。 但是这早已过时了,现在升级之后的产品型思维,就是将…

​​​​【收录 Hello 算法】4.4 内存与缓存

目录 4.4 内存与缓存 4.4.1 计算机存储设备 4.4.2 数据结构的内存效率 4.4.3 数据结构的缓存效率 4.4 内存与缓存 在本章的前两节中&#xff0c;我们探讨了数组和链表这两种基础且重要的数据结构&#xff0c;它们分别代表了“连续存储”和“分散存储”两种物理…

如何防止WordPress网站内容被抓取

最近在检查网站服务器的访问日志的时候&#xff0c;发现了大量来自同一个IP地址的的请求&#xff0c;用站长工具分析确认了我的网站内容确实是被他人的网站抓取了&#xff0c;我第一时间联系了对方网站的服务器提供商投诉了该网站&#xff0c;要求对方停止侵权行为&#xff0c;…

16【PS Aseprite 作图】图像从Aseprite传输到PS

【内容背景】Aseprite很适合做像素图&#xff0c;有一个“完美像素”的选项&#xff0c;就不用在PS里面慢慢修线&#xff0c;能够省事很多 【具体操作】 勾选完美像素 Aseprite里面的“完美像素”能够减少修线的步骤&#xff0c;在“作图”的时候一定要注意勾选 导出 选择…

【全开源】Java线上云酒馆单预约系统源码小程序源码

核心功能&#xff1a; 座位预约&#xff1a;用户可以通过该系统提前预约酒馆的座位&#xff0c;选择就餐时间和人数&#xff0c;以及特殊座位&#xff08;如包厢、卡座等&#xff09;&#xff0c;确保到店后有合适的座位。酒水点餐&#xff1a;用户可以在预约的同时&#xff0…
最新文章