当前位置: 首页 > news >正文

【操作幂等和数据一致性】保障业务在MySQL和COS对象存储的一致

业务场景

  1. 发布信息,更新到数据库MySQL
  2. COS操作,更新JSON文件

不过可能存在幂等性和数据一致性的问题。

// 批量存MySQL
entityPublishService.saveOrUpdateBatch(entityPublishList);
// 遍历批量存COS对象存储
   searchEntitys.forEach(req -> {
                    //删除旧的json文件
                    this.cosFileClient.deleteFile(Constant.ResourcePath.RESOURCE_PATH + req.getId());
                    ObjectMapper mapper = new ObjectMapper();
                    String jsonString = null;
                    try {
                        jsonString = mapper.writeValueAsString(req);
                        // 将 JSON 字符串转换为 InputStream 并上传
                        InputStream inputStream = new ByteArrayInputStream(jsonString.getBytes());
                        this.cosFileClient.uploadFile(Constant.ResourcePath.RESOURCE_PATH + req.getId(), inputStream);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }); 

数据一致性与幂等

  • 幂等性指的是无论操作执行多少次,结果都一致。
    比如,如果这段代码被重复执行,应该不会导致数据错误或重复上传。
  • 数据一致性则要确保数据库和COS中的JSON文件状态一致,
    比如更新数据库后,必须成功上传新的JSON,否则应该回滚或处理失败情况。

业务潜在问题

当前的代码在保存或更新数据库后,遍历entitys,删除旧的JSON,然后生成新的并上传。这里有几个潜在的问题:

  1. 数据库操作和文件操作之间没有事务管理,如果上传文件失败,数据库已经提交了事务,导致数据不一致。

  2. 删除旧文件后,如果上传新文件失败,会导致数据丢失。

  3. 如果方法被重复调用,可能导致多次上传或删除,不幂等。

寻找解决方案

  • 可能的解决方案是,将数据库的保存操作拆分为每个实体的单独操作,并在每个实体处理时,先上传文件,再更新数据库。这样,每个实体的处理是原子性的:上传文件成功,然后更新数据库。如果上传失败,数据库不更新。但这样会影响性能,因为批量操作变为单个操作。

  • 为了数据一致性,可能需要在上传失败时,提供某种回滚机制,比如删除已更新的数据库记录。但这在批量操作中较为困难。
    因此可能需要业务层面的处理,比如标记记录为待定状态,或者记录操作日志,用于后续的补偿

  • 正确的顺序应该是先保存数据库,再上传文件,这样即使上传失败,数据库已经更新,但文件未更新,其他系统可能无法获取最新数据,但至少数据库是正确的,可以触发后续的修复机制

后续修复或补偿机制,比如在RabbitMQ消费者中业务,配置重试机制。人工操作记录人工补偿机制

为了确保操作的幂等性和数据一致性,尤其是批量操作,代码改进:

  1. 移除不必要的旧文件删除操作:直接通过覆盖上传实现更新,避免删除后上传失败导致的数据丢失。
  2. 添加上传重试机制:提高上传成功率,减少因临时问题导致的失败。
  3. 异常处理和事务分离:确保数据库操作后尽可能完成文件上传,若失败则通过异常通知上层逻辑处理。
// 批量存储MySQL
entityPublishService.saveOrUpdateBatch(entityPublishList);

// 遍历批量对象存储COS
entitys.forEach(req -> {
    ObjectMapper mapper = new ObjectMapper();
    String jsonString;
    InputStream inputStream = null;
    final int maxRetries = 3;
    int retryCount = 0;
    boolean uploaded = false;
    
    while (retryCount < maxRetries && !uploaded) {
        try {
            // 序列化JSON数据
            jsonString = mapper.writeValueAsString(req);
            inputStream = new ByteArrayInputStream(jsonString.getBytes(StandardCharsets.UTF_8));
            
            // 幂等上传(覆盖模式)
            this.cosFileClient.uploadFile(
                Constant.ResourcePath.RESOURCE_PATH + req.getId(), 
                inputStream
            );
            uploaded = true; // 标记上传成功
            
        } catch (JsonProcessingException e) {
            // JSON序列化失败,无需重试
            throw new RuntimeException("JSON序列化失败,ID: " + req.getId(), e);
        } catch (Exception e) {
            retryCount++;
            if (retryCount >= maxRetries) {
                throw new RuntimeException("文件上传失败,超过最大重试次数,ID: " + req.getId(), e);
            }
            // 指数退避等待
            try {
                Thread.sleep(1000 * retryCount);
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("上传操作被中断,ID: " + req.getId(), ie);
            }
        } finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    // 日志记录关闭异常,但不影响主流程
                }
            }
        }
    }
    
    if (!uploaded) {
        // 此处可根据业务需要记录失败状态或触发补偿机制
        throw new RuntimeException("上传未完成,ID: " + req.getId());
    }
});

改进说明

  1. 幂等性保障

    • 覆盖上传:直接调用uploadFile覆盖同名文件,多次执行结果一致。
    • 数据库操作saveOrUpdateBatch本身应设计为幂等(如使用唯一约束或主键更新)。
  2. 数据一致性措施

    • 先持久化数据库:确保数据库更新后再同步文件,符合业务逻辑依赖。
    • 重试机制:通过最多3次重试(含退避等待),减少网络抖动等临时问题的影响。
    • 异常处理:上传失败时抛出异常,通知调用方处理(如日志记录、告警、人工干预或事务补偿)。
  3. 异常情况处理

    • 序列化错误:直接终止,无需重试(数据问题需修复)。
    • 上传失败:重试后仍失败则向上抛出,由系统统一处理(如标记数据状态、异步任务修复)。

注意事项

  • 若业务允许最终一致性,可考虑将上传失败记录至持久化队列,由后台任务异步重试。
  • 数据库设计可增加lastUpdateTime字段,确保重复提交时数据版本一致。
  • 监控上传失败异常,及时处理以保证系统健康度。

相关文章:

  • 大白话实战Gateway
  • 如何优化企业网络架构以提高性能和安全性?
  • Mac 清理缓存,提高内存空间
  • VTK 距离可视化 PolyDataDistance DistancePolyDataFilter
  • 使用 FFmpeg 剪辑视频指南
  • 分布式光纤声波振动技术在钻井泄漏检测中的应用
  • 【UCB CS 61B SP24】Lecture 4 - Lists 2: SLLists学习笔记
  • 在PyCharm中运行Jupyter Notebook的.ipynb文件及其pycharm软件的基础使用
  • 前端循环全解析:JS/ES/TS 循环写法与实战示例
  • windows解压多个文件夹内的zip文件脚本
  • 计算机三级网络技术知识汇总【4】
  • 1005 K 次取反后最大化的数组和(贪心)
  • Electron通过ffi-napi调用dll导出接口
  • MacOS安装Emacs
  • 【Mermaid图表渲染错误分析与解决方案之关键字错误】
  • 读书笔记:要点提炼《基于大模型的RAG应用开发与优化——构建企业级LLM应用》(严灿平)
  • 计算机科学与技术
  • GPT2 模型训练
  • 本地文件共享——HFS
  • Linux线程池
  • 由重商主义观察世界现代化历程
  • 当哲学与戏剧作为一种生活方式——《人生六戏》分享会
  • 青海一只人工繁育秃鹫雏鸟破壳后脱险成活,有望填补国内空白
  • 巴基斯坦召开国家安全委员会紧急会议,应对印方连环举措
  • 上海楼市明显复苏:一季度房地产开发投资增长5.1%,土地市场重燃战火
  • 土耳其发生6.2级地震,震源深度10千米