早几年测试时序库时,采集数据到kafka,然后用不同数据进行存储验证。Influxdb是花时间比较多的,它的数据建模方法、读写方法都需要使用特殊的API。时间久了自己也经常忘记,把当时的测试关键代码记录下来,也方便日后查找。
代码基于java编写。
1、接口数据定义,clientid+tag组合必须唯一
public class KafkaInfo{//客户端idpublic String clientid;//测点名称public String tag ;//数据时戳public String ts ;//测点数据类型public String vt;//测点值public String value;//本次数据更新次数public long updatacount;//测点说明public String desc;
}
2、数据写入
static Logger logger = Logger.getLogger(influxdbApplicationTest.class);// You can generate a Token from the "Tokens Tab" in the UIstatic String token = "web界面创建的token";static String bucket = "数据分库名";static String org = "初始化时创建的org";public static void main(String[] args) {logger.info("-------start ,ApplicationTest--------");InfluxDBClient client = InfluxDBClientFactory.create("http://10.126.12.113:8086", token.toCharArray());//// Write data//try (WriteApi writeApi = client.getWriteApi()) {//// Write by Data Point//for ( int i = 0;i < 10; i ++ ) {Point point = Point.measurement("Line123").addTag("tag", "Tags001").addTag("L1", "real").addField("value", 20*i).addField("update", 7000+i).time(Instant.now().toEpochMilli(), WritePrecision.MS);//writeApi.writePoint(bucket, org, point);}}//// Query data//String query = String.format("from(bucket: \"%s\") |> range(start: -1h) |> filter(fn: (r) =>r._measurement==\"Line123\"", bucket);QueryApi queryApi = client.getQueryApi();List<FluxTable> tables = queryApi.query(query,org);for (FluxTable fluxTable : tables) {List<FluxRecord> records = fluxTable.getRecords();for (FluxRecord fluxRecord : records) {System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getMeasurement()+","+ fluxRecord.getValueByKey("tag") +","+ fluxRecord.getValueByKey("_value"));}}client.close();logger.info("-------finish ,ApplicationTest--------");}