如何存储队列位置信息

         实际运行中的系统,难免会遇到重新消费某条消息、跳过一段时间内的消息等情况。这些异常情况的处理,都和Offset有关。本节主要分析Offset的存储位置,以及如何根据需要调整Offset的值。

        首先来明确一下Offset的含义,RocketMQ中,一种类型的消息会放到一个Topic里,为了能够并行,一般一个Topic会有多个Message Queue(也可以设置成一个),Offset是指某个Topic下的一条消息在某个Message Queue里的位置,通过Offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后继续处理。

如图1所示是Offset的类结构,主要分为本地文件类型和Broker代存的类型两种。对于DefaultMQPushConsumer来说,默认是CLUSTERING模式,也就是同一个Consumer group里的多个消费者每人消费一部分,各自收到的消息内容不一样。这种情况下,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore结构。


 

图1 OffsetStore的类结构

在DefaultMQPushConsumer里的BROADCASTING模式下,每个Consumer都收到这个Topic的全部消息,各个Consumer间相互没有干扰,RocketMQ使用LocalFileOffsetStore,把Offset存到本地。

OffsetStore使用Json格式存储,简洁明了,下面是个例子:

代码清单1 Offsetstore的内容示例

{"OffsetTable":{{"brokerName":"localhost", "QueueId":1,"Topic":"broker1" }: 1,{ "brokerName":"localhost", "QueueId":2,"Topic":"broker1" }:2, { "brokerName":"localhost", "QueueId":0, "Topic":"broker1" }:3 } }
 

在使用DefaultMQPushConsumer的时候,我们不用关心OffsetStore的事,但是如果PullConsumer,我们就要自己处理OffsetStore了。在前面文章中PullConsumer示例中,代码里把Offset存到了内存,没有持久化存储,这样就可能因为程序的异常或重启而丢失Offset,在实际应用中不推荐这样做。接下来给出在磁盘存储Offset的示例程序,参照LocalFileOffsetStore的源码编写,如代码清单2所示。

代码清单2 自定义持久存储OffsetStore

public class LocalOffsetStoreExt {
    private final String groupName;
    private final String storePath;
    private ConcurrentMap<MessageQueue, AtomicLong> OffsetTable =
            new ConcurrentHashMap<MessageQueue, AtomicLong>();
    public LocalOffsetStoreExt(String storePath, String groupName) {
        this.groupName = groupName;
        this.storePath = storePath;
    }
    public void load() {
        OffsetSerializeWrapper OffsetSerializeWrapper = this.readLocal-Offset();
        if (OffsetSerializeWrapper != null && OffsetSerializeWrapper.getOffsetTable() != null) {
            OffsetTable.putAll(OffsetSerializeWrapper.getOffsetTable());
            for (MessageQueue mq : OffsetSerializeWrapper.getOffsetTable().keySet()) {
                AtomicLong Offset = OffsetSerializeWrapper.getOffset-Table().get(mq);
                System.out.printf("load Consumer's Offset, {} {} {} \n", this.groupName, mq, Offset.get());
            }
        }
    }
    public void updateOffset(MessageQueue mq, long Offset) {
        if (mq != null) {
            AtomicLong OffsetOld = this.OffsetTable.get(mq);
            if (null == OffsetOld) {
                this.OffsetTable.putIfAbsent(mq, new AtomicLong(Offset));
            } else {
                OffsetOld.set(Offset);
            }
        }
    }
    public long readOffset(final MessageQueue mq) {
        if (mq != null) {
            AtomicLong Offset = this.OffsetTable.get(mq);
            if (Offset != null) {
                return Offset.get();
            }
        }
        return 0;
    }
    public void persistAll(Set<MessageQueue> mqs) {
        if (null == mqs || mqs.isEmpty())
            return;
        OffsetSerializeWrapper OffsetSerializeWrapper = new Offset-SerializeWrapper();
        for (Map.Entry<MessageQueue, AtomicLong> entry : this.OffsetTable.
            entrySet()) {
            if (mqs.contains(entry.getKey())) {
                AtomicLong Offset = entry.getValue();
                OffsetSerializeWrapper.getOffsetTable().put(entry.getKey(), Offset);
            }
        }
        String jsonString = OffsetSerializeWrapper.toJson(true);
        if (jsonString != null) {
            try {
                MixAll.string2File(jsonString, this.storePath);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    private OffsetSerializeWrapper readLocalOffset() {
        String content = null;
        try {
            content = MixAll.file2String(this.storePath);
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (null == content || content.length() == 0) {
            return null;
        } else {
            OffsetSerializeWrapper OffsetSerializeWrapper = null;
            try {
                OffsetSerializeWrapper =
                        OffsetSerializeWrapper.fromJson(content, Offset-SerializeWrapper.class);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return OffsetSerializeWrapper;
        }
    }
}

 

了解OffsetStore的存储机制以后,我们看看如何设置Consumer读取消息的初始位置。DefaultMQPushConsumer类里有个函数用来设置从哪儿开始消费消息:比如setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET),这个语句设置从最小的Offset开始读取。如果从队列开始到感兴趣的消息之间有很大的范围,用CONSUME_FROM_FIRST_OFFSET参数就不合适了,可以设置从某个时间开始消费消息,比如Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP),Consumer.setConsumeTimestamp("20131223171201"),时间戳格式是精确到秒的。注意设置读取位置不是每次都有效,它的优先级默认在Offset Store后面,比如在DefaultMQPushConsumer的BROADCASTING方式下,默认是从Broker里读取某个Topic对应ConsumerGroup的Offset,当读取不到Offset的时候,ConsumeFromWhere的设置才生效。大部分情况下这个设置在Consumer Group初次启动时有效。如果Consumer正常运行后被停止,然后再启动,会接着上次的Offset开始消费,ConsumeFromWhere的设置无效。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/186331.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux每日智囊

每日分享三个Linux命令&#xff0c;悄悄培养读者的Linux技能。 info 作用 查看程序、库和系统文档的详细信息。 info命令和man命令都用于查看命令和程序的帮助信息&#xff0c;区别如下&#xff1a; man命令&#xff1a;是最常用的命令之一&#xff0c;用于查看Linux系统上…

Apipost-Helper:IDEA中的类postman工具

今天给大家推荐一款IDEA插件&#xff1a;Apipost-Helper-2.0&#xff0c;写完代码IDEA内一键生成API文档&#xff0c;无需安装、打开任何其他软件&#xff1b;写完代码IDEA内一键调试&#xff0c;无需安装、打开任何其他软件&#xff1b;生成API目录树&#xff0c;双击即可快速…

如何以管理员的身份运行Powershell

大全&#xff01;珍藏 方式一&#xff1a;在Cortana搜索栏中打开带管理员权限的PowerShell Windows 10的任务栏自带了搜索。或者开始菜单选搜索只需在搜索框中输入powershell。 在出来的搜索结果中右击Windows PowerShell&#xff0c;然后选择以管理员方式运行。 随后会弹出UA…

React进阶之路(二)-- 组件通信、组件进阶

文章目录 组件通信组件通信的意义父传子实现props说明子传父实现兄弟组件通信跨组件通信Context通信案例 React组件进阶children属性props校验组件生命周期 组件通信 组件通信的意义 组件是独立且封闭的单元&#xff0c;默认情况下组件只能使用自己的数据&#xff08;state&a…

Go cobra简介

当你需要为你的 Go 项目创建一个强大的命令行工具时&#xff0c;你可能会遇到许多挑战&#xff0c;比如如何定义命令、标志和参数&#xff0c;如何生成详细的帮助文档&#xff0c;如何支持子命令等等。为了解决这些问题&#xff0c;github.com/spf13/cobra 就可以派上用场。 g…

SpringCloud——服务网关——GateWay

1.GateWay是什么&#xff1f; gateway也叫服务网关&#xff0c;SpringCloud GateWay使用的是Webflux中的reactor-netty响应式编程组件&#xff0c;底层使用了Netty通讯框架。 gateway的功能有反向代理、鉴权、流量控制、熔断、日志监控...... 2.为什么不使用Zuul&#xff1f…

智慧社区大屏:连接社区生活的数字桥梁

随着科技的不断发展&#xff0c;智慧社区已经不再只是未来的概念&#xff0c;它已经在我们的眼前悄然崭露头角。智慧社区是一种基于数字技术的社区管理和生活方式&#xff0c;旨在提高社区的安全性、便利性和生活质量。而在这个数字化的社区中&#xff0c;智慧社区大屏起到了连…

MYSQL函数,一篇文章看完!

做程序员的谁会离得开数据库呢&#xff1f;今天就来分享一下我整理的MySQL的常用函数&#xff0c;基本上囊括了平时要用的函数&#xff0c;它们已经陪我走过了不少年头了&#xff0c;风里来雨里去&#xff0c;缝缝补补又几年&#xff0c;希望能帮到你们&#xff01; 如果数据库…

HTML+CSS、Vue+less+、HTML+less 组件封装实现二级菜单切换样式跑(含全部代码)

一、HTMLCSS二级菜单 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>Document</title><…

4K壁纸下载器,多种风格壁纸,一键批量下载到本地,桌面壁纸,高清壁纸,壁纸下载

一个桌面壁纸爬虫工具&#xff0c;该工具可以从内置的多个壁纸网站爬取高清壁纸&#xff0c;并支持将壁纸一键下载到本地&#xff0c;真正实现了所见即所得&#xff0c;不必再费心费力的翻看多个网站。 文末附工具下载链接~ 一、软件简介 本次带来的工具由吾爱的一位大佬开发…

SRRC认证的必要性:保障电子产品质量安全的重要措施

随着电子产品的普及和应用&#xff0c;对电子产品的质量安全要求也越来越高。为了保障消费者的权益和安全&#xff0c;国家对电子产品进行了严格的监管和管理。其中&#xff0c;SRRC认证是保障电子产品质量安全的重要措施之一。 SRRC认证是指在我国境内生产、销售、使用的无线电…

浙大计算机学院2024届推免直博生名单

名单&#xff1a; 分析&#xff1a; 浙大计算机学院共录取推免直博生158人&#xff0c;其中计算机科学与技术专业73人&#xff0c;人工智能专业7人&#xff0c;软件工程专业21人&#xff0c;网络空间安全专业19人&#xff0c;电子信息专业31人&#xff0c;设计专业7人 欢迎关…

idea Plugins 搜索不到插件

Settings — System Settings — HTTP Proxy&#xff0c;打开HTTP Proxy 页面&#xff0c;设置自动发现代理&#xff1a; 勾选Atuto-detect proxy settings&#xff0c;勾选Automatic proxy configuration URL&#xff0c;输入&#xff1a; https://plugins.jetbrains.com/id…

力扣每日一题 ---- 2905. 找出满足差值条件的下标 II

这道题带有绝对值差的题&#xff0c;一看就是双指针的题&#xff0c;并且还带有两个限制&#xff0c;那么我们的做法就是 固定一个条件&#xff0c;维护一个条件 本题还用到了一个贪心思路&#xff0c;会介绍到 那我们怎么固定一个条件&#xff0c;维护一个条件&#xff1f; …

java多线程文件下载器

文章目录 1.简介2.文件下载的核心3.文件下载器的基础代码3.1 HttpURLConnection3.2 用户标识 4.下载信息4.1 计划任务4.2 ScheduledExecutorService&#x1f340; schedule方法&#x1f340; scheduleAtFixedRate方法&#x1f340; scheduleWithFixedDelay方法 5.线程池简介5.1…

python+pytorch人脸表情识别

概述 基于深度学习的人脸表情识别&#xff0c;数据集采用公开数据集fer2013&#xff0c;可直接运行&#xff0c;效果良好&#xff0c;可根据需求修改训练代码&#xff0c;自己训练模型。 详细 一、概述 本项目以PyTorch为框架&#xff0c;搭建卷积神经网络模型&#xff0c;训…

Zigbee—网络层地址分配机制

&#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 ♈️今日夜电波&#xff1a;孤雏 0:21━━━━━━️&#x1f49f;──────── 4:14 &#x1f504; ◀️ ⏸ ▶️ ☰ &#x1f497;关注…

AI 时代的企业级安全合规策略

目录 漏洞分类管理的流程 安全策略管理 在扫描结果策略中定义细粒度的规则 有效考虑整个组织中的关键漏洞 确保职责分离 尝试组合拳 本文来源&#xff1a;about.gitlab.com 作者&#xff1a;Grant Hickman 在应用程序敏捷研发、敏捷交付的今天&#xff0c;让安全人员跟上…

EasyExcel 导出冻结指定行

导出的实体类 package org.jeecg.modules.eis.test;import com.alibaba.excel.annotation.ExcelProperty; import com.alibaba.excel.annotation.write.style.*; import lombok.Getter; import lombok.Setter; import org.apache.poi.ss.usermodel.HorizontalAlignment;import…

pytorch复现_UNet

什么是UNet U-Net由收缩路径和扩张路径组成。收缩路径是一系列卷积层和汇集层&#xff0c;其中要素地图的分辨率逐渐降低。扩展路径是一系列上采样层和卷积层&#xff0c;其中特征地图的分辨率逐渐增加。 在扩展路径中的每一步&#xff0c;来自收缩路径的对应特征地图与当前特征…