Kafka, 构建TB级异步消息系统
1.快速入门
1.1 阻塞队列
在生产线程 和 消费线程 之间起到了 , 缓冲作用,即避免CPU 资源被浪费掉
- BlockingQueue
- 解决 线程通信 的问题
- 阻塞方法 put 、 take
- 生产者、消费者 模式
- 生产者:产生数据的线程
- 消费者:使用数据的线程
- 实现类
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlockingQueue、SynchronousQueue、DelayQueue等
1.2 Kafaka 入门
Kafaka 简介
- Kafaka 是一个分布式的(消息队列 发展到---》 功能更加丰富) 流媒体平台
- 应用:消息系统、日志收集、用户行为追踪、流式处理
Kafaka 特点
- 高吞吐量(对于硬盘的顺序读写,性能很高,高于对于内存的随机读写)
- 消息持久化(将数据存储在硬盘中)
- 高可靠性 (分布式集群部署,一台服务器挂了,切换到其他服务器)
- 高扩展性(服务器不够用,则加一台服务器)
Kafaka 术语
- Broker(集群中的每一台服务器称为一个Broker)、Zookeeper(管理集群的软件)
- Topic(发布订阅模式,生产者把消息发送的位置-》topic)、Partition(表示对于主题位置的分区,每一个分区按照顺序往其中追加数据)、Offset(消息在分区内存在的索引)
- Leader Replica(主副本可以提供想要读取某个分区的数据,如果主副本挂了,则从集群中选用一个从副本作为主副本) 、Follower Replica(从副本只是 备份,不负责做响应)
- 每一个分区有多个副本,如果一个副本坏了,还有备份,提高容错率
1.3 Spring 整合 Kafka
- 引入依赖
- spring-kafka
- 配置Kafka
- 配置server
- 配置consumer
- 访问Kafka
- 生产者
- kafkaTemplate.send(topic, data);
- 消费者
- @KafkaListener(topics = {"test"}) public void handleMessage(ConsumerRecord record) {}
2.应用
2.1 安装
(1)安装包位置
链接:https://pan.baidu.com/s/1QjZdOUYdmSCSf0zjprJQIQ
提取码:9630
下载后解压缩
(2)相关配置
1
2
2.2 启动+使用
(1)启动zookeeper
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
(2)启动kafka
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0>bin\windows\kafka-server-start.bat config\server.properties
(3)创建主题
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-topics.bat --create --bootstrap-server localhost:9092 -replication-factor 1 --partitions 1 --topic test1
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-topics.bat --list --bootstrap-server localhost:9092
__consumer_offsets
test
test1
(4)创建生产者
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test1
(5)创建消费者
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning
再次生产消息,会自动消费消息
2.3 spring中整合kafka
(1)配置Properties
#9. kafkaProperties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=community-consumer-group # 是否自动提交(记录) 消费者偏移量 spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=3000
其中spring.kafka.consumer.group-id=community-consumer-group与。。。保持一致
(2)导入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>
(3)test
1.定义producer
@Component
class KafkaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, String content) {kafkaTemplate.send(topic,content);}
}
2.定义consumer
@Component
class KafkaConsumer {@KafkaListener(topics ={"test"})public void handleMessage(ConsumerRecord record){System.out.println(record.value());//PHDVB干嘛呢//呕吼}
}
3.编写测试类
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {@Autowiredprivate KafkaProducer kafkaProducer;@Testpublic void testKafka() {kafkaProducer.sendMessage("test","PHDVB干嘛呢");kafkaProducer.sendMessage("test","呕吼");// 生产者发消息是 主动的 ,消费者收消息是 被动地try {Thread.sleep(1000 * 10);} catch (InterruptedException e) {e.printStackTrace();}}
}