分布式多线程性能调优
使用多线程优化接口
//下单业务public Object order( long userId){long start = System.currentTimeMillis();//方法的开始时间戳(ms)JSONObject orderInfo = remoteService.createOrder(userId);Callable<JSONObject> callable1 = new Callable<JSONObject>() {@Overridepublic JSONObject call() throws Exception {JSONObject goodsInfo = remoteService.dealGoods(orderInfo);return goodsInfo;}};Callable<JSONObject> callable2 = new Callable<JSONObject>() {@Overridepublic JSONObject call() throws Exception {JSONObject pointsInfo = remoteService.dealPoints(orderInfo);return pointsInfo;}};Callable<JSONObject> callable3 = new Callable<JSONObject>() {@Overridepublic JSONObject call() throws Exception {JSONObject deliverInfo = remoteService.dealDeliver(orderInfo);return deliverInfo;}};LeeFutureTask<JSONObject> task1 = new LeeFutureTask(callable1);LeeFutureTask<JSONObject> task2 = new LeeFutureTask(callable2);LeeFutureTask<JSONObject> task3 = new LeeFutureTask(callable3);Thread thread1 =new Thread(task1);Thread thread2 =new Thread(task2);Thread thread3 =new Thread(task3);thread1.start();thread2.start();thread3.start();try {orderInfo.putAll(task1.get());orderInfo.putAll(task2.get());orderInfo.putAll(task3.get());} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}long end = System.currentTimeMillis();//方法的开始时间戳(ms)System.out.println(end-start);//打印这个方法的执行时间return orderInfo;}
后台批处理的优化
public JSONObject orderFastbatch (long userId) throws Exception{JSONObject orderInfo = remoteService.createOrderFast(userId);//JSONObject goodsInfo = remoteService.dealGoodsFast(orderInfo); //这里是单个的请求,想做成批量+异步的方式//orderInfo.putAll(goodsInfo);CompletableFuture<JSONObject> future = new CompletableFuture<>();Request request = new Request();request.future =future;request.object = orderInfo;queue.add(request);return future.get(); //这里类似于FutureTask 的get方法,去异步的方式拿结果}//定义一个JUC中的MQLinkedBlockingQueue<Request> queue = new LinkedBlockingQueue();class Request{JSONObject object;CompletableFuture<JSONObject> future;}@PostConstructpublic void DoBiz(){ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);executorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {int size =queue.size();if(size ==0) return;//没有数据在queue中,定时任务不需要执行什么if(size >1000) size=1000;//这里限制每次批量封装的最多是1000List<JSONObject> ListJSONRepuest = new ArrayList<>();List<Request> ListRequest = new ArrayList<>();for( int i =0 ;i <size;i++){Request request =queue.poll();//从MQ中拉取ListRequest.add(request);ListJSONRepuest.add(request.object);}//调用了多少次批量接口List<JSONObject> ListJSONReponse = remoteService.dealGoodsFastBatch(ListJSONRepuest);System.out.println("调用批量接口,本地组装的数据:"+size+"条");for(JSONObject JSONReponse:ListJSONReponse){//这里可以使用hashmap 的方式减少一轮遍历。for(Request request:ListRequest){String request_OrderId =request.object.get("orderId").toString();String response_OrderId =JSONReponse.get("orderId").toString();if(request_OrderId.equals(response_OrderId)){request.future.complete(JSONReponse);}}}}}, 3000, 50, TimeUnit.MILLISECONDS);}
//处理库存信息 (批量接口) 1000,public List<JSONObject> dealGoodsFastBatch( List<JSONObject> objectList) {List<JSONObject> list = objectList;Iterator it = list.iterator();while(it.hasNext()){JSONObject result =(JSONObject)it.next();result.put("dealGoods", "ok");}return list;}
批处理与MySQL的综合性优化
如果做了批量处理的话。
1W个请求,高并发请求,其实里面针对的修改库存会集中在一些热点数据8000个在一个商品。
应用层基于批量的接口进行优化。
伪代码:
for循环遍历list,把所有相同的goods_id放到hashmap进行扣减计数即可。
分布式锁
单机锁:sync、lock
MySQL去实现分布式锁–for update (行锁)
Redis
分布式锁的第一种常见方案:Redis来实现分布式锁。Redis key-value键值对的数据库–内存。
Redis的分布式锁的实现逻辑:
1、加锁,setnx key value
1)为了避免死锁问题setnx完之后设置TTL失效时间
2)为了TTL的失效的时候业务还未完成导致的多个应用抢到锁的BUG,这里可以使用一个守护线程,来不断的去续锁(延长key的TTL)
2、解锁del key
无论是加锁,还是解锁,这里涉及到多个命令。要解决原子性问题:
1、复合命令实现加锁。set lock 9527 ex 10 nx
2、解锁的逻辑中:在del之前一定要判断:只有持有锁的应用或线程,才能去解锁成功,否则都是失败。value做文章。存一个唯一标识。
if (get ==应用的保存的值)del
else释放锁失败
使用Lua脚本
锁的可重入。A抢到了锁,没有释放锁之前,依然可以lock进入加锁逻辑的。
Zookeeper
1、连接ZK、创建分布式锁的根节点/lock
2、一个线程获取锁时,create ()在/lock西面创建1个临时顺序节点3、使用getChildren()获取当前所有字节点,并且对子节点进行排序
4、判断当前创建的节点是否是所有子节点中最小的节点,如果是,则获取锁->执行对应的业务逻辑->在结束的时候使用delete()方法删除该节点。
否则需要等待其他的节点的释放锁、
5、当一个线程需要释放锁的,删除该节点,其他的线程获取当前节点前的一个节点来获取锁。