Schema注册表
为了提升kafka的性能,减少网络传输和存储的数据大小,可以把数据的schema部分单独存储到外部的schema注册表中,整体架构如下图所示:
1)把所有数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema ID。
2)消费者使用 ID 从注册表里拉取 schema 来反序列化记录。
3)序列化器和反序列化器分别负责处理 schema 的注册和拉取。
schema注册表并不属于Kafka,现在已经有一些开源的schema 注册表实现,比如Confluent Schema Registry。
安装confluent
参考:安装手册
# confluent的安装包中已经包含了zookeeper和kafka的安装包,无需单独再下载
# 下载
curl -O https://packages.confluent.io/archive/7.7/confluent-7.7.1.tar.gz
# 解压
tar -xzf confluent-7.7.1.tar.gz
解压以后目录结构如下:
文件夹 | 描述 |
---|---|
bin | 可执行文件 |
etc | 配置文件 |
lib | 服务 |
libexec | 多平台的客户端库 |
share | jar包和license |
src | 源码 |
# 设置环境变量
vim /etc/profile
export CONFLUENT_HOME=/usr/local/confluent-7.7.1
export PATH=$CONFLUENT_HOME/bin:$PATH
# 加载环境变量
source /etc/profile
# 验证
confluent --help
启动confluent服务
启动zookeeper
cd /usr/local/confluent-7.7.1/etc/kafka
vim zookeeper.properties
# 可以调整zookeeper的端口和数据的存储目录
# 启动zookeeper
./bin/zookeeper-server-start -daemon ./etc/kafka/zookeeper.properties
# 验证
ps -ef | grep zookeeper
启动kafka
cd /usr/local/confluent-7.7.1/etc/kafka
vim server.properties
broker.id=0
# 监听地址
listeners=0.0.0.0://:9092
# 对外暴漏的地址
advertised.listeners=PLAINTEXT://192.168.200.128:9092
# zookeeper的地址
zookeeper.connect=localhost:2181
# 启动./bin/kafka-server-start -daemon ./etc/kafka/server.properties
# 验证
netstat -nap | grep 9092
启动confluent
cd /usr/local/confluent-7.7.1/etc/schema-registry
# 修改schema-registry.properties
vim schema-registry.properties
# schema-registry的监听地址
listeners=http://0.0.0.0:8081
# kafka的访问地址
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
# 启动
./bin/schema-registry-start -daemon ./etc/schema-registry/schema-registry.properties
# 验证
netstat -nap | grep 8081
新建springboot项目
新建avro的schema文件User.avsc
{"namespace": "com.github.xjs.protocol","type": "record","name": "UserRecord","fields": [{"name": "id","type": "int"},{"name": "name","type": "string"}]
}
pom中添加avro-maven-plugin插件
<!--https://avro.apache.org/docs/1.11.1/getting-started-java/-->
<!-- 命令行执行:mvn generate-sources 把avsc转化成java文件
-->
<plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.11.1</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory><outputDirectory>${project.basedir}/src/main/java/</outputDirectory></configuration></execution></executions>
</plugin>
添加avro和kafka的依赖
<dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.7.1</version>
</dependency>
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
添加对应的配置
server:port: 8080
spring:kafka:bootstrap-servers: 192.168.200.128:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer# 重点关注这里的KafkaAvroSerializervalue-serializer: io.confluent.kafka.serializers.KafkaAvroSerializerconsumer:group-id: testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 重点关注这里的.KafkaAvroDeserializervalue-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializerproperties:# confluent的地址schema.registry.url: http://192.168.200.128:8081
消息生产者
public void send(UserRecord record) {if (Objects.isNull(record)) {return;}log.info("send message, value:{}", record.toString());// 跟发送普通消息一样,可以直接发送UserRecordkafkaTemplate.send("demo-topic", record);
}
消息消费者
@KafkaListener(topics = "demo-topic")
public void consume(ConsumerRecord<String, UserRecord> user){// 跟接收普通消息一样,可以直接接收UserRecordlog.info("receive message, topic:{}, key:{}, value:{}", user.topic(), user.key(), user.value());
}
完整的源码下载:https://github.com/xjs1919/learning-demo/tree/master/springboot-kafka-avro