java高并发热点数据更新问题怎么解决

发布时间:2023-04-26 16:33:27 作者:iii
来源:亿速云 阅读:100

今天小编给大家分享一下java高并发热点数据更新问题怎么解决的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

mysql update的时候到底是锁行还是锁表?

InnoDb 锁简单分类

按照数据操作的粒度

1)行级锁:锁住记录行

2)表级锁:锁住整张表

按照对数据操作的类型

1)读锁(共享锁):针对同一份数据,多个读操作可以同时进行而不会互相影响。  

 2) 写锁(排它锁):当前操作没有完成之前,它会阻断其他写锁和读锁。

mysql 在update时会根据where 条件的类型决定锁行还是锁表 where的过滤条件列,如果用索引,锁行,无法用索引,锁表。按照索引规则,如果能使用索引,锁行,不能使用索引,锁表。 行锁是排他锁,当一条记录已经被一条update语句锁住时会阻断其他的update操作,在高并发场景下,对于热点数据来说会进行频繁的更新操作造成其他update操作锁等待超时请求失败

背景

以旅游支付场景为例,伴随着业务量的增加,系统的并发量会逐渐上升,例如“北京长城度假区”的账户流水会变得十分频繁,每次支付或者退款操作都需要去更新一下账户余额,并发较低时并不会有什么问题,但当旅游高峰期到来时并发量上升,数据库更新的时候需要获得数据行锁,在未释放这个行锁之前,其他事务只能是等待。

解决方案

1.支付时异步入账,退款增加一个欠款垫资户

用户支付入款需要给账户加钱时此时商户对于资金的实时性要求不高,追求准确性,因此可以将账户加款放到异步线程池,达到错峰的目的 然而当用户发起退款时,我们必须及时并且准确的从账户扣款,因此退款采取同步进行,退款的订单相对于支付来说量就会少很多,满足要求。但是存在一个问题高并发状态下某一个热点账户余额时刻在变很有可能退款发起时账户余额充足但是实际扣除时由于上一笔支付未入账,造成金额不足

1.加分布式锁

对特定账户加锁,保证某一刻只有一笔退款请求获得该账户的操作权 弊端:多个用时同时退款时只有一笔成功,对用户不友好,pass掉

2.新增垫资商户

热点账户增加一个指定透支额度的垫资户,实际账户余额不足时从垫资户借款,然后定期核对垫资户透支额度从实际账户一次性扣款, 推荐

2.合并请求

合并多条需要更新余额的请求

将一段时间内的请求,先进行阻塞,合并各个账户需要更新的金额,一次性处理,然后将结果拆分,唤醒被阻塞的请求

java高并发热点数据更新问题怎么解决

demo实现

 * @Author: xiaokunkun
 * @CreateTime: 2023-04-23  14:37
 * @Description: 合并更新,可以不捕捉异常报错后外层调用方直接捕获异常事务回滚
 */
@Service
public class CommodityAmountService {
    class Request {
        AcctUpdateDto acctUpdateDto;
        //预留字段 可不使用
        String atomCode;
        //暂定返回结果为true或者false
        CompletableFuture<Boolean> future; // 接受结果
    }
 
    // 积攒请求(每隔N毫秒批量处理一次)
    LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>();
 
    // 定时任务的实现,N秒钟处理一次数据
    @PostConstruct
    public void init() {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            // 1、取出queue的请求,生成一次合并更新
            int size = queue.size();
            if (size == 0) {
                return;
            }
            ArrayList<Request> requests = new ArrayList<>();
            for (int i = 0; i < size; i++) {
                //队列出栈
                Request request = queue.poll();
                requests.add(request);
            }
            System.out.println("批量处理数据量:" + size);
            // 2、组装一个合并更新 key为账户value为sum(amount)
            Map<String, Long> amountMap = new HashMap<>();
            ArrayList<String> commodityCodes = new ArrayList<>();
            for (Request request : requests) {
                //todo 根据accountNo分组
            }
            for (String key : amountMap.keySet()) {
                Long amount = amountMap.get(key);
                //update mysql
            }
            // 3、将结果响应 分发给每一个单独的用户请求。由定时任务处理线程 --> n个用户的请求线程
            for (Request request : requests) {
                // 将结果返回到对应的请求线程,只要不报错此批次全部返回true,否则false
                request.future.complete(true);
            }}, 0, 1000, TimeUnit.MILLISECONDS);
    }
 
    @Autowired
    CommodityRemoteService commodityRemoteService;
 
    // 合并金额并更新,多个用户请求
    public Boolean updateMergeAmount(String movieCode)
            throws ExecutionException, InterruptedException {
        // 并非立刻发起接口调用,请求收集起来,再进行
        Request request = new Request();
        request.atomCode = movieCode;
        // 异步编程:获取异步处理的结果
        CompletableFuture<Boolean> future = new CompletableFuture<>();
        request.future = future;
        queue.add(request);
        return future.get(); // 此处get方法,会阻塞线程运行,直到future有返回
    }
}

测试类:

//模拟500的并发量
public void updateMerge() {
    AcctCmdDriver acctCmdDriver = new AcctCmdDriver();
    TradeAccntOrderDetail detail = new TradeAccntOrderDetail();
    AcctNoInfo acctNoInfo = new AcctNoInfo();
    OrderConsist consistForOrder = OrderConsist.newInstance("0200_202304", "trade_accnt_merchant_order");
    detail.setConsistForOrder(consistForOrder);
    acctCmdDriver.setDetail(detail);
    acctCmdDriver.setAcctNoInfo(acctNoInfo);
    detail.setAccountCategory(AccountCategoryEnum.MERCHANT);
    detail.setAccountNo("02020001010000262977202304");
    detail.setAmount(6l);
    System.out.println("start build thread" + acctCmdDriver);
    Random rand = new Random();
    for (int i = 1; i <= 500; i++) {
        final String index = "code_" + i;
        Thread thread = new Thread(() -> {
            try {
                System.out.println("amount is:" + detail.getAmount());
                countDownLatch.await();
                Thread.sleep(rand.nextInt(150));
                Boolean res = updateMergeAmountService.mergeUpdate(acctCmdDriver);
                System.out.println("current i" + index + "res:" + res);
            } catch (InterruptedException e) {
                System.out.println("thread error is:" + e);
            }
        });
        thread.start();
        // 启动后,倒计时器倒计数减一,代表又有一个线程就绪了
        countDownLatch.countDown();
    }
}

以上就是“java高并发热点数据更新问题怎么解决”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注亿速云行业资讯频道。

推荐阅读:
  1. Java语言中的抽象类与继承实例代码分析
  2. Java如何通过动态规划设计股票买卖最佳时机

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

java

上一篇:Android数据双向绑定的实现原理和应用场景是什么

下一篇:OpenCV特征匹配和单应性矩阵查找对象怎么实现

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》