JavaWeb_LeadNews_Day6-Kafka

JavaWeb_LeadNews_Day6-Kafka

  • Kafka
    • 概述
    • 安装配置
    • kafka入门
    • kafka高可用方案
    • kafka详解
      • 生产者同步异步发送消息
      • 生产者参数配置
      • 消费者同步异步提交偏移量
    • SpringBoot集成kafka
  • 自媒体文章上下架
    • 实现思路
    • 具体实现
  • 来源
  • Gitee

Kafka

概述

  • 对比
  • 选择
  • 介绍
    • producer: 发布消息的对象称之为主题生产者 (Kafka topic producer)
    • topic: Kafka将消息分门别类,每一类的消息称之为一个主题 (Topic)
    • consumer:订阅消息并处理发布的消息的对象称之为主题消费者 (consumers)
    • broker:已发布的消息保存在一组服务器中,称之为Kafka集群,集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅个或多个主题 (topic),并从Broker拉数据,从而消费这些已发布的消息

安装配置

  • 安装zookeeper
    // 下载zookeeper镜像
    docker pull zookeeper:3.4.14
    // 创建容器
    docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
    
  • 安装kafka
    // 下载kafka镜像
    docker pull wurstmeister/kafka:2.12-2.3.1
    // 创建容器
    docker run -d --name kafka \
    --env KAFKA_ADVERTISED_HOST_NAME=192.168.174.133 \
    --env KAFKA_ZOOKEEPER_CONNECT=192.168.174.133:2181 \
    --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.174.133:9092 \
    --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
    --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
    --net=host wurstmeister/kafka:2.12-2.3.1// 解释
    --net=host,直接使用容器宿主机的网络命名空间,即没有独立的网络环境。它使用宿主机的ip和端口(云主机会不好使)
    

kafka入门

  • 依赖
    <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
    </dependency>
    
  • Producer
    public class ProducerQuickStart {public static void main(String[] args) {// 1. kafka链接配置信息Properties prop = new Properties();// 1.1 kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");// 1.2 key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(prop);// 3. 发送信息// 参数列表: topic, key, valueProducerRecord<String, String> record = new ProducerRecord<>("topic-first", "key1", "Hello Kafka!");producer.send(record);// 4. 关闭消息通道// 必须关闭, 否则消息发送bucgproducer.close();}
    }
    
  • Consumer
    public class ConsumerQuickStart {public static void main(String[] args) {// 1. kafka的配置信息Properties prop = new Properties();// 1.1 链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");// 1.2 key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 1.3 设置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");// 2. 创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);// 3. 订阅主题consumer.subscribe(Collections.singleton("topic-first"));// 4. 拉取信息while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());}}}
    }
    
  • 总结
    • 同一组只有一个消费者能够接收到消息, 如果需要所有消费者都能接收到消息, 需要消费者在不同的组

kafka高可用方案

  • 集群

  • 备份

    kafka定义了两类副本:

    • 领导者副本
    • 追随者副本

    数据在领导者副本存储后, 会同步到追随者副本

    同步方式
    leader失效后, 选择leader的原则

    1. 优先从ISR中选取, 因为ISR的数据和leader是同步的.
    2. ISR中的follower都不行了, 就从其他的follower中选取.
    3. 当所有的follower都失效了, 第一种是等待ISR中的follower活过来, 数据可靠, 但等待时间不确定, 第二种是等待任意follower活过来, 最快速度恢复可用性, 但数据不一定完整.

kafka详解

生产者同步异步发送消息

// 同步发送
RecordMetadata metadata = producer.send(record).get();
System.out.println(metadata.offset());// 异步发送
producer.send(record, new Callback(){@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null) {System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}
});

生产者参数配置

  • 消息确认
    确认机制说明
    acks=0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快
    acks=1(默认值)只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
    acks=all只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
    prop.put(ProducerConfig.ACKS_CONFIG, "all");
    
  • 消息重传
    设置消息重传次数, 默认每次重试之间等待100ms
    prop.put(ProducerConfig.RETRIES_CONFIG, 10);
    
  • 消息压缩
    默认情况, 消息发送不会压缩
    使用压缩可以降低网络传输开销和存储开销, 而这往往是向kafka发送消息的瓶颈所在
    压缩算法说明
    snappy占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果看重性能和网络带宽,建议采用
    lz4占用较少的 CPU,压缩和解压缩速度较快,压缩比也很客观
    gzip占用较多的CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法
    prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    

消费者同步异步提交偏移量

// 同步提交偏移量
consumer.commitSync();// 异步提交偏移量
consumer.commitAsync(new OffsetCommitCallback(){@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e!=null){System.out.println("记录错误的提交偏移量"+map+", 异常信息为"+e);}}
});// 同步异步提交
try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());System.out.println(record.partition());System.out.println(record.offset());}// 异步提交偏移量consumer.commitAsync();}
} catch (Exception e) {e.printStackTrace();System.out.println("记录错误的信息:"+e);
}finally {// 同步consumer.commitSync();
}

SpringBoot集成kafka

  • 依赖
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
    </dependency>
    
  • 配置
    server:port: 9991
    spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.174.133:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
  • Producer
    @RestController
    public class HelloController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/hello")public String hello(){kafkaTemplate.send("itcast-topic", "黑马程序员");return "ok";}
    }
    
  • Consumer
    @Component
    public class HelloListener {@KafkaListener(topics = "itcast-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}}
    }
    
  • 传递对象
    // Producer
    User user = new User();
    user.setName("tom");
    user.setAge(18);
    kafkaTemplate.send("itcast-topic", JSON.toJSONString(user));// Consumer
    System.out.println(JSON.parseObject(message, User.class));
    

自媒体文章上下架

实现思路

具体实现

  • Producer
    public ResponseResult downOrUp(WmNewsDto dto) {// 1. 检验参数// 1.0 检查文章dto是否为空if(dto == null){return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, "不可缺少");}// 1.1 检查文章上架参数是否合法if(dto.getEnable() != 0 && dto.getEnabl!= 1){// 默认上架dto.setEnable((short) 1);}// 2. 查询文章WmNews news = getById(dto.getId());if(news == null){return ResponseResult.errorRe(AppHttpCodeEnum.DATA_NOT_EXIST, 存在");}// 3. 查询文章状态if(news.getStatus() != WmNews.StaPUBLISHED.getCode()){return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, 章不是发布状态, 不能上下架");}// 4. 上下架news.setEnable(dto.getEnable());updateById(new// 5. 发送消息, 通知article修改文章的配置if(news.getArticleId() != null){HashMap<String, Object> map = HashMap<>();map.put("articleId", news.getArtic());map.put("enable", news.getEnable());kafkaTemplate.(WmNewsMessageConstaWM_NEWS_UP_OR_DOWN_TOPIC, JtoJSONString(map));return ResponseResult.okRe(AppHttpCodeEnum.SUCCESS);
    }
    
  • Consumer
// Listener
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onMessage(String message)
{if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);}
}// Service
public void updateByMap(Map map) {// 0 下架, 1 上架Object enable = map.get("enable");boolean isDown = true;if(enable.equals(1)){isDown = false;}// 修改文章update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId, map.get("articleId")).set(ApArticleConfig::getIsDown, isDown));
}

来源

黑马程序员. 黑马头条

Gitee

https://gitee.com/yu-ba-ba-ba/leadnews

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/97973.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机视觉之三维重建(一)(摄像机几何)

针孔摄像机 添加屏障&#xff1a; 使用针孔(o光圈针孔摄像机中心)&#xff0c;实现现实与成像一对一映射&#xff0c;减少模糊。其中针孔与像平面的距离为f(焦距)&#xff1b;虚拟像平面位于针孔与真实物体之间&#xff0c;与像平面互为倒立关系。位置映射&#xff1a;利用相似…

【王道-进程与线程】

#pic_center R 1 R_1 R1​ R 2 R^2 R2 目录 知识框架No.0 引言No.1 进程的概念、组成、特征一、进程的概念二、进程的组成1、PCB进程控制块2、程序段/数据段 三、程序是如何运行的&#xff1f;四、进程的特征五、总结 No.2 进程的状态转换和组织一、进程的状态1、创建态、就绪态…

听GPT 讲Prometheus源代码--discovery

Prometheus是一个开源的系统监控和警报工具包&#xff0c;以下是Prometheus源代码中一些主要的文件夹及其作用&#xff1a; cmd/&#xff1a;这个目录包含了Prometheus主要的命令行工具&#xff0c;如prometheus/&#xff0c;promtool/等。每个子目录都代表一个可执行的命令行应…

常见前端基础面试题(HTML,CSS,JS)(三)

JS 中如何进行数据类型的转换&#xff1f; 类型转换可以分为两种&#xff0c;隐性转换和显性转换 显性转换 主要分为三大类&#xff1a;数值类型、字符串类型、布尔类型 三大类的原始类型值的转换规则我就不一一列举了 数值类型&#xff08;引用类型转换&#xff09; Numbe…

我和 TiDB 的故事 | 远近高低各不同

作者&#xff1a; ShawnYan 原文来源&#xff1a; https://tidb.net/blog/b41a02e6 Hi, TiDB, Again! 书接上回&#xff0c; 《我和 TiDB 的故事 | 横看成岭侧成峰》 &#xff0c;一年时光如白驹过隙&#xff0c;这一年我好似在 TiDB 上投入的时间总量不是很多&#xff0…

回归预测 | MATLAB实现CSO-SVM布谷鸟优化算法优化支持向量机多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现CSO-SVM布谷鸟优化算法优化支持向量机多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现CSO-SVM布谷鸟优化算法优化支持向量机多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09;效果一…

redis 存储结构原理 2

咱们接着上一部分来进行分享&#xff0c;我们可以在如下地址下载 redis 的源码&#xff1a; https://redis.io/download 此处我下载的是 redis-6.2.5 版本的&#xff0c;xdm 可以直接下载上图中的 **redis-6.2.6 **版本&#xff0c; redis 中 hash 表的数据结构 redis hash …

RFID技术助力汽车零配件装配产线,提升效率与准确性

随着科技的不断发展&#xff0c;越来越多的自动化设备被应用到汽车零配件装配产线中。其中&#xff0c;射频识别&#xff08;Radio Frequency Identification&#xff0c;简称RFID&#xff09;技术凭借其独特的优势&#xff0c;已经成为了这一领域的重要技术之一。本文将介绍RF…

redis乐观锁+启用事务解决超卖

乐观锁用于监视库存&#xff08;watch&#xff09;&#xff0c;然后接下来就启用事务。 启用事务&#xff0c;将减库存、下单这两个步骤&#xff0c;放到一个事务当中即可解决秒杀问题、防止超卖。 但是&#xff01;&#xff01;&#xff01;乐观锁&#xff0c;会带来" …

C++运算符重载

C运算符重载 C运算符重载&#xff1a;使对象的运算表现得和编译器内置类型一样。 C实现复数类实现运算符重载 C类对象操作符重载函数函数时&#xff0c;会优先调用类的成员方法&#xff0c;没有找到再去全局去寻找对应的方法。 在调用某些操作符重载函数时&#xff0c;如果…

创建密码库/创建用户帐户/更新 Ansible 库的密钥/ 配置cron作业

目录 创建密码库 创建用户帐户 更新 Ansible 库的密钥 配置cron作业 创建密码库 按照下方所述&#xff0c;创建一个 Ansible 库来存储用户密码&#xff1a; 库名称为 /home/curtis/ansible/locker.yml 库中含有两个变量&#xff0c;名称如下&#xff1a; pw_developer&#…

YOLOv5、YOLOv8改进:S2注意力机制

目录 1.简介 2.YOLOv5改进 2.1增加以下S2-MLPv2.yaml文件 2.2common.py配置 2.3yolo.py配置 1.简介 S2-MLPv2注意力机制 最近&#xff0c;出现了基于 MLP 的视觉主干。与 CNN 和视觉Transformer相比&#xff0c;基于 MLP 的视觉架构具有较少的归纳偏差&#xff0c;在图像识…

中国剩余定理及扩展

目录 中国剩余定理解释 中国剩余定理扩展——求解模数不互质情况下的线性方程组&#xff1a; 代码实现&#xff1a; 互质&#xff1a; 非互质&#xff1a; 中国剩余定理解释 在《孙子算经》中有这样一个问题&#xff1a;“今有物不知其数&#xff0c;三三数之剩二&#x…

go es实例

go es实例 1、下载第三方库 go get github.com/olivere/elastic下载过程中出现如下报错&#xff1a; 解决方案&#xff1a; 2、示例 import package mainimport ("context""encoding/json""fmt""reflect""time""…

【前端】快速掌握HTML+CSS核心知识点

文章目录 1.HTML核心基础知识1.1.编写第一个HTML网页1.2.超链接a标签和路径1.3.图像img标签的用法1.4.表格table标签用法1.5.列表ul、ol、dl标签用法1.6.表单form标签用法1.7.区块标签和行内标签用法 2.CSS核心基础知识2.1.CSS标签选择器viewport布局2.2.CSS样式的几种写法2.3.…

【Linux取经路】解析环境变量,提升系统控制力

文章目录 一、进程优先级1.1 什么是优先级&#xff1f;1.2 为什么会有优先级&#xff1f;1.3 小结 二、Linux系统中的优先级2.1 查看进程优先级2.2 PRI and NI2.3 修改进程优先级2.4 进程优先级的实现原理2.5 一些名词解释 三、环境变量3.1 基本概念3.2 PATH&#xff1a;Linux系…

k8s 常见面试题

前段时间在这个视频中分享了 https://github.com/bregman-arie/devops-exercises 这个知识仓库。 这次继续分享里面的内容&#xff0c;本次主要以 k8s 相关的问题为主。 k8s 是什么&#xff0c;为什么企业选择使用它 k8s 是一个开源应用&#xff0c;给用户提供了管理、部署、扩…

Learning to Super-resolve Dynamic Scenes for Neuromorphic Spike Camera论文笔记

摘要 脉冲相机使用了“integrate and fire”机制来生成连续的脉冲流&#xff0c;以极高的时间分辨率来记录动态光照强度。但是极高的时间分辨率导致了受限的空间分辨率&#xff0c;致使重建出的图像无法很好保留原始场景的细节。为了解决这个问题&#xff0c;这篇文章提出了Sp…

idea2023 springboot2.7.5+mybatisplus3.5.2+jsp 初学单表增删改查

创建项目 修改pom.xml 为2.7.5 引入mybatisplus 2.1 修改pom.xml <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.2</version></dependency><!--mysq…

【STM32 学习】电源解析(VCC、VDD、VREF+、VBAT)

VCC电源电压GND电源供电负电压&#xff08;通常接地&#xff09;VDD模块工作正电压VSS模块工作负电压VREFADC参考正电压VREF-ADC参考负电压VBAT电池或其他电源供电VDDA模拟供电正电压VSSA模拟供电负电压 一、VCC&#xff08;供电电压&#xff09; VCC是指芯片的电源电压&#…