1.安装环境
kafka环境
参考Docker搭建kafka环境-CSDN博客
xk6-kafka环境
./xk6 build --with github.com/mostafa/xk6-kafka@latest
查看安装情况
2.编写脚本
test_kafka.js
// Either import the module object
import * as kafka from "k6/x/kafka";// Or individual classes and constants
import {Writer,Reader,Connection,SchemaRegistry,SCHEMA_TYPE_STRING,
} from "k6/x/kafka";// Creates a new Writer object to produce messages to Kafka
const writer = new Writer({// WriterConfig objectbrokers: ["localhost:9092"],topic: "my-topic",
});const reader = new Reader({// ReaderConfig objectbrokers: ["localhost:9092"],topic: "my-topic",
});const connection = new Connection({// ConnectionConfig objectaddress: "localhost:9092",
});const schemaRegistry = new SchemaRegistry();
// Can accept a SchemaRegistryConfig objectif (__VU == 0) {// Create a topic on initialization (before producing messages)connection.createTopic({// TopicConfig objecttopic: "my-topic",});
}export default function () {// Fetch the list of all topicsconst topics = connection.listTopics();console.log(topics); // list of topics// Produces message to Kafkawriter.produce({// ProduceConfig objectmessages: [// Message object(s){key: schemaRegistry.serialize({data: "my-key",schemaType: SCHEMA_TYPE_STRING,}),value: schemaRegistry.serialize({data: "my-value",schemaType: SCHEMA_TYPE_STRING,}),},],});// Consume messages from Kafkalet messages = reader.consume({// ConsumeConfig objectlimit: 10,});// your messagesconsole.log(messages);// You can use checks to verify the contents,// length and other properties of the message(s)// To serialize the data back into a string, you should use// the deserialize method of the Schema Registry client. You// can use it inside a check, as shown in the example scripts.let deserializedValue = schemaRegistry.deserialize({data: messages[0].value,schemaType: SCHEMA_TYPE_STRING,});
}export function teardown(data) {// Delete the topicconnection.deleteTopic("my-topic");// Close all connectionswriter.close();reader.close();connection.close();
}
3.运行测试
运作之前先开启kafka服务,打开终端输入命令
./k6 run test_kafka.js --vus 50 --duration 10s
测试结果