目录
使用stock_job工程采集到国内大盘的最新交易时间的信息并插入数据库,使用rabbitmq发送消息
1.导入依赖
2.编写yml文件,配置连接rabbitmq的信息
3.编写mq的配置类,生成交换机,消息队列,并将他们绑定
4.采集最新到最新的国内大盘数据信息,并插入到数据时时,发送消息
5.查看消息队列是否存在消息
在stock_backend工程中定义消息监听类,并配置本地缓存
1.导入mq依赖和caffeine
2.编写yml文件
3.编写caffeine的配置类,和mq的配置类
4.编写消息监听类
5. stockService.getInnerMarket()方法
6.debug启动SpringBoot引导类
自动跳到消息监听类
清空缓存,然后调用mapper方法重新向数据库查询最新的国内大盘数据,再重新保存到本地缓存中
编辑 查看caffeineCache成员变量的值
成功缓存key为innerMarketInfosKey
的数据
当我们在查询最新的股票大盘数据时,我们会频繁的向mysql查询数据,会给mysql造成很大的压力,所以我们可以使用caffeineCache本地缓存。
大致思路:
我们先使用stock_job工程采集到国内大盘的最新交易时间的信息时并将数据插入数据库,使用rabbitmq发送消息(消息为当前时间)
在stock_backend工程定义消息队列监听类,如果接收到的时间和发送消息的时间相差一分钟时,就报错,不超过一分钟,就清除之前的本地缓存,再通过mapper向数据库查询数据,然后再重新把最新的国内大盘数据存入本地缓存中。
这样一来,如果stock_job工程没有向数据库中插入最新交易时间的国内大盘数据信息,在stock_backend中查询最新交易时间的国内大盘数据信息时,就会直接从本地缓存中获取数据信息。
使用stock_job工程采集到国内大盘的最新交易时间的信息并插入数据库,使用rabbitmq发送消息
1.导入依赖
<!-- 导入mq依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.编写yml文件,配置连接rabbitmq的信息
spring:rabbitmq:host: 192.168.230.100 # rabbitMQ的ip地址port: 5672 # 端口username: hhhpassword: 1234virtual-host: /
3.编写mq的配置类,生成交换机,消息队列,并将他们绑定
@Configuration
public class MqConfig {/*** 重新定义消息序列化的方式,改为基于json格式序列化和反序列化*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}/*** 国内大盘信息队列*/@Beanpublic Queue innerMarketQueue() {return new Queue("innerMarketQueue", true);}/*** 定义路由股票信息的交换机*/@Beanpublic TopicExchange innerMarketTopicExchange() {return new TopicExchange("stockExchange", true, false);}/*** 绑定队列到指定交换机*/@Beanpublic Binding bindingInnerMarketExchange() {return BindingBuilder.bind(innerMarketQueue()).to(innerMarketTopicExchange()).with("inner.market");//设置routingKey}
}
4.采集最新到最新的国内大盘数据信息,并插入到数据时时,发送消息
@Overridepublic void getInnerMarketInfo() {//......//解析的数据批量插入数据库int count= stockMarketIndexInfoMapper.insertBatch(entities);log.info("当前插入了:{}行数据",count);//通知后台终端刷新本地缓存,发送的日期数据是告知对方当前更新的股票数据所在时间点rabbitTemplate.convertAndSend("stockExchange","inner.market",new Date());}
5.查看消息队列是否存在消息
在stock_backend工程中定义消息监听类,并配置本地缓存
1.导入mq依赖和caffeine
<dependencies><!-- 导入mq依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
<!-- 本地缓存依赖--><dependency><groupId>com.github.ben-manes.caffeine</groupId><artifactId>caffeine</artifactId></dependency>
2.编写yml文件
spring:rabbitmq:host: 192.168.230.130 # rabbitMQ的ip地址port: 5672 # 端口username: hhhpassword: 1234virtual-host: /
3.编写caffeine的配置类,和mq的配置类
/*** 构建缓存bean* @return*/@Beanpublic Cache<String,Object> caffeineCache(){Cache<String, Object> cache = Caffeine.newBuilder().maximumSize(200)//设置缓存数量上限
// .expireAfterAccess(1, TimeUnit.SECONDS)//访问1秒后删除
// .expireAfterWrite(1,TimeUnit.SECONDS)//写入1秒后删除.initialCapacity(100)// 初始的缓存空间大小.recordStats()//开启统计.build();return cache;}
这里不用定义交换机和消息队列
@Configuration
public class MqConfig {/*** 重新定义消息序列化的方式,改为基于json格式序列化和反序列化*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}
4.编写消息监听类
@Component
@Slf4j
public class MqListener {@Autowiredprivate Cache<String,Object> caffeineCache;@Autowiredprivate StockService stockService;@RabbitListener(queues = "innerMarketQueue")public void acceptInnerMarketInfo(Date date){//消息队列里的数据类型是Date,所以接收的参数类型也是Datelong differTime = DateTime.now().getMillis() - new DateTime(date).getMillis();if(differTime>60000L){log.error("采集国内大盘时间点:{},同步超时:{}ms",new DateTime(date).toString("yyyy-MM-dd HH:mm:ss"),differTime);}//发送信息和接收信息在一分钟以内//删除key为innerMarketInfosKey的缓存caffeineCache.invalidate("innerMarketInfosKey");//重新获取数据//调用服务更新缓存stockService.getInnerMarket();}
}
5. stockService.getInnerMarket()方法
/*** 获取国内大盘最新的数据* @return*/@Overridepublic R<List<InnerMarketDomain>> getInnerMarket() {//获取key为innerMarketInfosKey的本地缓存数据,如果不存在,就去数据库中查询数据,并存入本地缓存中//本地缓存默认一分钟消失R<List<InnerMarketDomain>> result= (R<List<InnerMarketDomain>>) caffeineCache.get("innerMarketInfosKey", key->{//1.获取当前时间的最新交易点(精确到分钟,秒和毫秒置为0)Date curDate = DateTimeUtil.getLastDate4Stock(DateTime.now()).toDate();//Date curDate = MyDateTimeUtil.getLateDate4Stock(DateTime.now()).toDate();//mock data 等后续股票采集job工程完成,再将此代码删除curDate=DateTime.parse("2021-12-28 09:31:00", DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toDate();//log.info("curDate:{}",curDate);//2.获取国内大盘的编码集合List<String> mcodes = stockInfoConfig.getInner();//3.调用mapper进行查询List<InnerMarketDomain> data=stockMarketIndexInfoMapper.getInnerMarket(curDate,mcodes);//4.返回数据return R.ok(data);});return result;}