Kafka快速入门:Kafka驱动JavaApi的使用

生产者和消费者是Kafka的核心概念之一,它们在客户端被创建和使用,并且包含了许多与Kafka性能和机制相关的配置。虽然Kafka提供的命令行工具能够执行许多基本操作,但它无法实现所有可能的性能优化。相比之下,使用Java API可以充分利用编程语言的灵活性,对生产者和消费者进行更精细的性能调优。对于大多数中间件,熟悉服务器的命令行操作可能足以帮助学习其API的使用。然而,Kafka则不同,要全面掌握Kafka的所有特性,必须系统地学习和理解其Java API。

1. 基础消费流程

在javaApi中可以通过创建一个Kafka生产者和消费者的配置对象,在new生产者或消费者的类时将配置对象传入,然后生产者实例通过调用send方法发送数据,消费者通过poll方法消费数据,数据需要通过ProducerRecords类封装key和value,并在生产者和消费者配置中为key和value指定序列化和反序列化类(key可以传null,key是在日志回收策略中发挥作用)。经过这样一套操作,消息就可以成功从生产者发往消费者。

package com.kafak.testkafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import java.time.Duration;
import java.util.Properties;@SpringBootTest
class TestKafkaApplicationTests {//预定义Kafka对象实例,因为Kafka对象时线程安全,所以可以定义外面节省资源防止重复创建KafkaProducer<String, String> kafkaProducer;//创建生产者public KafkaProducer<String, String> getKafkaProducer() {//创建生产者配置Properties props = new Properties();//配置Kafka集群地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");//配置序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//返回生产者return new KafkaProducer<String, String>(props);}//创建消费者public KafkaConsumer<String, String> getKafkaConsumer() {//创建消费者配置Properties props = new Properties();//配置Kafka集群地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");//配置消费者组idprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");//配置反序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//返回消费者return new KafkaConsumer<>(props);}//通过生产者生产一百条数据@Testvoid kafkaProducerTest() {//获取生产者kafkaProducer = getKafkaProducer();//发送消息for (int i = 0; i < 100; i++) {kafkaProducer.send(new ProducerRecord<String, String>("topicJava", "testKey" + i, "testValue" + i));}}//通过消费者消费消息@Testvoid kafkaConsumerTest() {//创建消费者,由于消费者是线程不安全,所以使用一次实例化一次,可以方式出现线程安全问题KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();//接受消费者信息,传入100毫秒,消费者会一百毫秒拉去一次消息ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));// 处理消息for (ConsumerRecord<String, String> record : records) {System.out.printf("Consumed message with key %s, value %s, from partition %d with offset %d%n",record.key(), record.value(), record.partition(), record.offset());}}}

2. 消息确认

消息确认的原理性知识可以通过下面这篇文章学习,这里主要讲实操。

Kafka运行机制(二):消息确认,消息日志的存储和回收icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141329851?spm=1001.2014.3001.5501

生产者端

生产者端的消息确认策略由acks配置项控制,其由三种配置方式,其中我在下面这篇文章中详细讲述了相关知识。我们可以通过javaApi配置acks来控制确认策略。

生产者端的消息确认有同步和异步两种方式。

  • 同步消息确认:同步消息确认是生产者实例在调用send方法后紧接着调用get方法,该方法会阻塞线程的继续执行,等待消息发送结果。当消息发送失败时,如果配置了重试机制(通过设置 retries 属性),生产者会自动重试指定的次数。如果在所有重试尝试后仍然失败,最终会抛出异常,通知调用方消息发送失败。
  • 异步消息确认:异步消息确认通过生产者实例调用send方法时传入,第二个回调函数的参数。在成功或失败响应时,执行回调函数,异步消息确认不会阻塞代码,也不会触发自动重试。

消费者端

消费者消费成功在客户端的体现是成功获取到了数据,这本没有什么好说的,不过消费者不仅需要响应客户端数据,还要讲偏移量发送给Kafka,在这一过程中,消费者提供了手动提交和自动提交两种方式。启动自动提交是默认开启的,而手动提交则需要配置enable.auto.commit为false,然后通过创建分区和偏移量的映射关系,通过消费者的commit方法提交偏移量。

代码实现

下面代码中,我创建了四个测试单元,其中前两个测试单元,分别是生产者同步提交和异步提交,而后两个测试单元分别时消费者的自动提交和手动提交。

package com.kafak.testkafka;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;@SpringBootTest
class TestKafkaApplicationTests {//预定义Kafka对象实例,因为Kafka对象时线程安全,所以可以定义外面节省资源防止重复创建KafkaProducer<String, String> kafkaProducer;//创建生产者public KafkaProducer<String, String> getKafkaProducer() {//创建生产者配置Properties props = new Properties();//配置消息确认策略props.put(ProducerConfig.ACKS_CONFIG, "all");//配置重试次数props.put(ProducerConfig.RETRIES_CONFIG,3);//配置Kafka集群地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");//配置序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//返回生产者return new KafkaProducer<String, String>(props);}//创建消费者public KafkaConsumer<String, String> getKafkaConsumer(Boolean isAutoCommit) {//创建消费者配置Properties props = new Properties();//配置Kafka集群地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");//配置消费者组idprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");//配置反序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//判断当前消费者是否开启自动提交if (!isAutoCommit) {//关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);}else{//设置自动提交间隔时间1sprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");}//返回消费者return new KafkaConsumer<>(props);}//生产者同步确认@Testvoid kafkaProducerGetTest() {if(kafkaProducer == null) {kafkaProducer = getKafkaProducer();}//同步确认消息是否发送成功for (int i = 0; i < 100; i++) {try{RecordMetadata topicJava = kafkaProducer.send(new ProducerRecord<String, String>("topicJava", "testKey" + i, "testValue" + i)).get();}catch (Exception e) {e.printStackTrace();}}kafkaProducer.close();}//生产者异步确认@Testvoid kafkaProduceSyncTest() {if(kafkaProducer == null) {kafkaProducer = getKafkaProducer();}//异步确认是否发送成功for (int i = 0; i < 100; i++) {kafkaProducer.send(new ProducerRecord<String, String>("topicJava", "testKey" + i, "testValue" + i), (metadata, exception) -> {if (exception == null) {System.out.printf("发送消息成功, metadata=%s%n", metadata);} else {System.err.printf("发送消息失败, exception=%s%n", exception.getMessage());}});}kafkaProducer.close();}//消费者自动提交@Testvoid kafkaAutoCommitConsumerTest() {//创建消费者开启自动提交KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer(true);//消费数据流程中无需负责偏移量提交while (true) {//接受消费者信息,传入100毫秒,消费者会一百毫秒拉去一次消息ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));//处理消息for (ConsumerRecord<String, String> record : records) {System.out.printf("消息消费成功, key=%s, value=%s, partition=%d, offset=%d%n",record.key(), record.value(), record.partition(), record.offset());}}}//消费者手动提交@Testvoid kafkaSyncCommitConsumerTest() {//创建消费者关闭自动提交KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer(false);//消费数据流程中需要在消费数据后,提交偏移量while (true) {//接受消费者信息,传入100毫秒,消费者会一百毫秒拉去一次消息ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));// 处理消息for (ConsumerRecord<String, String> record : records) {System.out.printf("消息消费成功, key=%s, value=%s, partition=%d, offset=%d%n",record.key(), record.value(), record.partition(), record.offset());//创建分区和偏移量的映射类Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();//讲分区和偏移量的数据存入映射类offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));//偏移量提交kafkaConsumer.commitSync(offsets);}}}}

3. 批处理

批处理在生产者端,和消费者端也有不同的实现。我在Kakfa基本概念一文中清楚的讲解了批处理的概念,文章如下

Kafka基本概念icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141270920?spm=1001.2014.3001.5501

生产者

在生产者端,生产者实例的send方法会发送消息到缓冲区中,而缓冲区消息何时发送给Kafka集群,则是通过配置batch.size和linger.ms配置,来实现当缓冲区存入多少消息,和距离上一次发送消息多久后,来发送这一轮缓冲区的消息到Kafka集群,代码实现如下

    //创建生产者public KafkaProducer<String, String> getKafkaProducer() {//创建生产者配置Properties props = new Properties();//配置生产者批处理//缓冲区大小最大为16384比特props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//距离上次发送消息时间隔3sprops.put(ProducerConfig.LINGER_MS_CONFIG,"3000");//配置消息确认策略props.put(ProducerConfig.ACKS_CONFIG, "all");//配置重试次数props.put(ProducerConfig.RETRIES_CONFIG,3);//配置Kafka集群地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");//配置序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//返回生产者return new KafkaProducer<String, String>(props);}

消费者

消费者端批处理,消费者在拉去消息时,会在fetch.max.bytes,max.partition.fetch.bytes和max.poll.records三个配置项,以及传入poll方法的超时时间参数的限制下,尽可能多的拉取更多消息。

  • fetch.max.bytes:fetch.max.bytes 是指 Kafka 消费者单次从服务器拉取数据时能够获取的最大字节数。这是全局的上限,控制每次 poll() 操作可以拉取的数据量总和。 默认值:50MB(即 52428800 字节)。
  • max.partition.fetch.bytes:max.partition.fetch.bytes 是指 Kafka 消费者从单个分区拉取消息时能获取的最大字节数,当消费者数量少于主题分区数量时,一个消费者可能会负责多个分区。默认值:1MB(即 1048576 字节)。
  • max.poll.records:max.poll.records 是指 Kafka 消费者每次调用 poll() 方法时能够拉取的最大消息条数。 默认值:500 条消息。

代码实现如下

    //创建消费者public KafkaConsumer<String, String> getKafkaConsumer(Boolean isAutoCommit) {//创建消费者配置Properties props = new Properties();//消费者批处理相关配置//消费缓冲区大小,也就是一次消费最多能消费多少比特消息props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,16384);//一次消费一个分区最多能消费多少比特props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,8192);//一次消费最多能消费多少条数据props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1000);//配置Kafka集群地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");//配置消费者组idprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");//配置反序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//判断当前消费者是否开启自动提交if (!isAutoCommit) {//关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);}else{//设置自动提交间隔时间1sprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");}//返回消费者return new KafkaConsumer<>(props);}

4. 事务操作

Kafka驱动支持事务操作,允许许生产者在多个主题和分区上以原子方式写入消息。这意味着你可以确保一组消息要么全部成功写入Kafka,要么全部失败。

事务操作首先通过生产者实例调用生产者实例的initTransactions方法,向kafka集群申请一个映射当前生产者的事务Id,然后就可以通过调用生产者实例的beginTransaction方法,开启一个事务,进行消息发送,最终通过调用commitTransaction方法完成事务的提交,如果中途发生异常则通过abortTransaction对当前事务进行回滚,代码实例如下

package com.kafak.testkafka;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;@SpringBootTest
class TestKafkaApplicationTests {//预定义Kafka对象实例,因为Kafka对象时线程安全,所以可以定义外面节省资源防止重复创建KafkaProducer<String, String> kafkaProducer;//创建生产者public KafkaProducer<String, String> getKafkaProducer() {//创建生产者配置Properties props = new Properties();//配置生产者批处理//缓冲区大小最大为16384比特props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//距离上次发送消息时间隔3sprops.put(ProducerConfig.LINGER_MS_CONFIG,"3000");//配置消息确认策略props.put(ProducerConfig.ACKS_CONFIG, "all");//配置重试次数props.put(ProducerConfig.RETRIES_CONFIG,3);//配置Kafka集群地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");//配置序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//返回生产者return new KafkaProducer<String, String>(props);}//测试事务@Testvoid kafkaProducerTransactionTest() {if(kafkaProducer == null) {kafkaProducer = getKafkaProducer();}kafkaProducer.initTransactions();try{kafkaProducer.beginTransaction();//消息发送相关操作for (int i = 0; i < 100; i++) {try{RecordMetadata topicJava = kafkaProducer.send(new ProducerRecord<String, String>("topicJava", "testKey" + i, "testValue" + i)).get();}catch (Exception e) {e.printStackTrace();}}kafkaProducer.commitTransaction();}catch (Exception e) {e.printStackTrace();kafkaProducer.abortTransaction();}kafkaProducer.close();}}

5. 自定义分区器

Kafka允许用户自定义分区器,实现特定的分区策略。可以通过实现Partitioner接口来创建自定义分区器。实现Partitioner接口需要实现三个方法,分别是partition,configure,close。

partition方法

partition方法是实现分区逻辑其的主要方法,其接受六个参数,分别是

  • String topic:消息要发送到的Kafka主题名称
  • Object key:消息的 key,可能为 null。
  • byte[] keyBytes:序列化后的 key,可能为 null
  • Object value:消息的 value,可以为任意对象
  • byte[] valueBytes:序列化后的 value,可能为 null。
  • Cluster cluster:Kafka集群的元数据信息,包括主题的分区数、每个分区的领导者等

partition方法的返回值则是发送分区的编号,通过这个机制可以实现不同逻辑的分区器。

configure方法

configuer方法在自定义分区类初始化时调用,当设计一些复杂操作,比如在发送消息前要和数据库交互时,可以在configure中完成数据库的连接。

close方法

close在分区逻辑执行完后调用,和configure一样,在复杂操作时,用于关闭分区逻辑中创建的连接,或一些内存资源等

假设我有一个三主机集群,其中30主机性能最好,31其次,32最差,我要通过自定义分区,将消息发送到三个分区的比例为3:2:1,通过Partitioner接口,可以简单的通过如下方式实现

package com.kafak.testkafka;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;import java.util.List;
import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {//获取分区元数据List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(s);//创建一个0-100的随机数double num = Math.random() * 100;//默认传递分区号Integer finalPartition = 0;for (PartitionInfo partitionInfo : partitionInfos) {//获取分区的leaderNode leader = partitionInfo.leader();//获取分区leader的ip和端口String leaderAddress = leader.host() + ":" + leader.port(); // 生成 "host:port" 格式的字符串//如果随机数在0-50之间,发送消息至192.168.142.30:9092if (num < 50 && leaderAddress.equals("192.168.142.30:9092")) {finalPartition = partitionInfo.partition();break; //如果随机数在50-82之间,发送消息至192.168.142.31:9092} else if (num < 82 && num >= 50 && leaderAddress.equals("192.168.142.31:9092")) {finalPartition = partitionInfo.partition();break;//如果随机数在82-100之间,发送消息至192.168.142.32:9092} else if (num < 100 && num >= 82 && leaderAddress.equals("192.168.142.32:9092")) {finalPartition = partitionInfo.partition();break;}}//返回最终分区号return finalPartition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

在生产者配置中通过partitioner_class配置自定义分区器,代码如下

 public KafkaProducer<String, String> getKafkaProducer() {//创建生产者配置Properties props = new Properties();//启用自定义分区器props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafak.testkafka.CustomPartitioner");//配置生产者批处理//缓冲区大小最大为16384比特props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//距离上次发送消息时间隔3sprops.put(ProducerConfig.LINGER_MS_CONFIG,"3000");//配置消息确认策略props.put(ProducerConfig.ACKS_CONFIG, "all");//配置重试次数props.put(ProducerConfig.RETRIES_CONFIG,3);//配置Kafka集群地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");//配置序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//返回生产者return new KafkaProducer<String, String>(props);}

如此,便可以实现一个自定义分区策略。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/408654.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

zigbee笔记、十五、组播通信原理

一、zigbee四种通讯 1、单播&#xff08;略&#xff09; 2、广播&#xff08;略&#xff09; 3、组播&#xff1a;在zigbee网络中&#xff0c;模块可以用分组来标记&#xff0c;发送的模块如果发送的组号和网络里面标记接收模块的组号相对应&#xff0c;那么这些模块就可以拿到…

C#/.NET/.NET Core技术前沿周刊 | 第 1 期(2024年8.12-8.18)

前言 C#/.NET/.NET Core技术前沿周刊&#xff0c;你的每周技术指南针&#xff01;记录、追踪C#/.NET/.NET Core领域、生态的每周最新、最实用的技术文章、社区动态、优质项目和学习资源等。让你时刻站在技术前沿&#xff0c;助力技术成长与视野拓宽。 欢迎投稿&#xff0c;推荐…

innodb_buffer_pool_size在线缩小操作

一、背景 测试数据库内存32G&#xff0c;只有MySQL数据库&#xff0c;但是innodb_buffer_pool_size设置了24G&#xff0c;导致经常出现lack of memory问题、lack of swap问题。 因为使用了MySQL5.7.36版本&#xff0c;利用innodb_buffer_pool_size参数值可在线调整的新特性&…

C++函数调用栈从何而来

竹杖芒鞋轻胜马,谁怕?一蓑烟雨任平生~ 个人主页&#xff1a; rainInSunny | 个人专栏&#xff1a; C那些事儿、 Qt那些事儿 文章目录 写在前面原理综述x86架构函数调用栈分析如何获取rbp寄存器的值总结 写在前面 程序员对函数调用栈是再熟悉不过了&#xff0c;无论是使用IDE…

基于cubemx的STM32的freertos的串口通信

1、任务描述 使用freertos系统实现电脑调试助手和正点原子开发板STM32F103ZET6的串口通信。 2、cubemx设置 3、程序代码 &#xff08;1&#xff09;添加usart1.c #include "usart1.h"#include "usart.h"/**********重定义函数**********/struct __FILE …

阵列信号处理2_阵列信号最优处理常用准则(CSDN_20240825)

目录 最小均方误差&#xff08;Minimum Square Error&#xff0c;MSE&#xff09;准则 最大信噪比&#xff08;Maximum Signal Noise Ratio&#xff0c;MSNR&#xff09;准则 极大似然&#xff08;Maximum Likehood, ML&#xff09;准则 最小方差无损响应&#xff08;Minim…

速通教程:如何使用Coze+剪映,捏一个爆款悟空视频

程哥最近做了一个和黑神话悟空有关的视频&#xff0c;没想到就火了&#xff0c;视频主打一个玉石风格&#xff0c;就是下面这个视频。 视频请移步飞书观看&#xff1a;黑神话悟空玉石版 制作过程不算很复杂&#xff0c;全程只需要用到Coze智能体和剪映这两个工具。 智能体用…

【JVM】亿级流量调优(一)

亿级流量调优 oop模型 前面的klass模型&#xff0c;它是Java类的元信息在JVM中的存在形式。这个oop模型是Java对象在JVM中的存在形式 内存分配策略: 1.空闲列表2.指针碰撞(jvm采用的) 2.1 top指针:执行的是可用内存的起始位置 2.2 采用CAS的方式3.TLAB 线程私有堆4.PLAB 老年…

使用DropZone+SpringBoot实现图片的上传和浏览

经常在项目中需要使用上传文件功能&#xff0c;找了不少前端上传组件&#xff0c;都不是很好用&#xff0c;今天尝试了一下DropZone&#xff0c;发现不错&#xff0c;顺便记录一下使用过程&#xff0c;方便后续查阅。在做开发的时候&#xff0c;经常需要调研一些技术&#xff0…

C# 运算符

运算符是一种告诉编译器执行特定的数学或逻辑操作的符号。C# 有丰富的内置运算符&#xff0c;分为一下六类&#xff1a; 算术运算符 关系运算符 逻辑运算符 位运算符 赋值运算符 杂项运算符 算术运算符 C# 支持的所有算术运算符。假设变量 A 的值为 10&#xff0c;变量 B 的值…

安全面试常见问题任意文件下载

《网安面试指南》http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247484339&idx1&sn356300f169de74e7a778b04bfbbbd0ab&chksmc0e47aeff793f3f9a5f7abcfa57695e8944e52bca2de2c7a3eb1aecb3c1e6b9cb6abe509d51f&scene21#wechat_redirect 1.1 任意文件下…

旅游社交小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;每日签到管理&#xff0c;景点推荐管理&#xff0c;景点分类管理&#xff0c;防疫查询管理&#xff0c;美食推荐管理&#xff0c;酒店推荐管理&#xff0c;周边推荐管理 微信端账…

《数据结构》顺序表+算法代码+动画演示-C语言版

目录 顺序表概念 顺序表初始化 顺序表销毁 顺序表尾插 顺序表尾删 顺序表头删 顺序表头插 顺序表pos位置插入 顺序表pos位置删除 顺序表全部代码如下&#xff1a; 顺序表概念 顺序表是用一段 物理地址连续 的存储单元依次存储数据元素的线性结构&#xff0c;一般情况下…

多个程序监听不同网卡的相同端口、相同网卡不同IP的相同端口

1 概述 一个主机上的多个程序监听同一个端口&#xff0c;是否一定存在冲突&#xff1f;如果是多网卡、单网卡多IP的情景下&#xff0c;多个程序是可以独立监听的。 2 多个程序监听不同网卡的相同端口 3 多个程序监听同一个网卡不同IP的相同端口 4 小结 多个程序监听同一个网…

【C语言】常见文件操作

文件的常见操作 #include<stdio.h>// 由于devc代码编码为ANCI&#xff0c;故读取的文件中若有中文&#xff0c;请设置文件编码为ANCI&#xff0c;否则会乱码 // 读文件 void test1() {char ch;FILE *fp; // 创建文件指针fp fopen("./file.txt", "r"…

pycharm修改文件大小限制

场景&#xff1a; 方法&#xff1a; 打开pycharm 安装目录下的idea.properties 增加配置项&#xff1a;idea.max.intellisense.filesize99999

java后端请求与响应总结

get 请求&#xff1a;将参数写在请求路径中&#xff08;请求路径跟一个&#xff1f;后面跟参数多个参数之间用&连接&#xff09; post 请求&#xff1a;将参数写在请求体中中 一、请求 1.简单参数 如 传一个或两个字符串、整数等 例如串一个用户名和密码 如果传入的数…

【自动驾驶】控制算法(四)坐标变换与横向误差微分方程

写在前面&#xff1a; &#x1f31f; 欢迎光临 清流君 的博客小天地&#xff0c;这里是我分享技术与心得的温馨角落。&#x1f4dd; 个人主页&#xff1a;清流君_CSDN博客&#xff0c;期待与您一同探索 移动机器人 领域的无限可能。 &#x1f50d; 本文系 清流君 原创之作&…

从法律风险的角度来看,项目经理遇到不清楚或不明确问题时的处理

大家好&#xff0c;我是不会魔法的兔子&#xff0c;在北京从事律师工作&#xff0c;日常分享项目管理风险预防方面的内容。 序言 在项目开展过程中&#xff0c;有时候会遇到一些不清楚或不明确的状况&#xff0c;但碍于项目进度的紧迫性&#xff0c;不得不硬着头皮做决策&…

Golang | Leetcode Golang题解之第368题最大整除子集

题目&#xff1a; 题解&#xff1a; func largestDivisibleSubset(nums []int) (res []int) {sort.Ints(nums)// 第 1 步&#xff1a;动态规划找出最大子集的个数、最大子集中的最大整数n : len(nums)dp : make([]int, n)for i : range dp {dp[i] 1}maxSize, maxVal : 1, 1fo…