个人编制软件展示

PSI - Purchase Sale Inventory 进销存软件

PSI进销存系统订单实时处理与并发控制

引言

在进销存系统中,订单处理是最核心的业务场景。当多个用户同时下单、库存扣减时,如何保证数据的一致性,避免超卖问题是系统设计的关键挑战。本文介绍订单实时处理与并发控制的最佳实践。

并发问题场景分析

进销存系统中常见的并发问题:

场景 问题描述 影响
库存超卖 多个请求同时读取库存,库存不足仍被扣减 库存为负数,无法发货
重复下单 用户快速点击多次,创建多个订单 重复扣款,财务混乱
金额计算错误 同时计算价格,优惠叠加错误 订单金额不正确
库存扣减与订单状态不一致 库存扣减成功,订单创建失败 数据不一致

库存扣减方案

方案一:数据库乐观锁

-- 库存表设计
CREATE TABLE `product_stock` (
  `id` BIGINT PRIMARY KEY AUTO_INCREMENT,
  `product_id` BIGINT NOT NULL,
  `warehouse_id` BIGINT NOT NULL,
  `quantity` INT NOT NULL DEFAULT 0,
  `version` INT NOT NULL DEFAULT 0,  -- 版本号,用于乐观锁
  `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  UNIQUE KEY `uk_product_warehouse` (`product_id`, `warehouse_id`)
) ENGINE=InnoDB;

-- 乐观锁扣减库存
UPDATE product_stock
SET quantity = quantity - #{deductQty},
    version = version + 1
WHERE product_id = #{productId}
  AND warehouse_id = #{warehouseId}
  AND quantity >= #{deductQty}
  AND version = #{currentVersion};

-- 检查影响行数
-- 如果返回影响行数为0,则说明库存不足或版本冲突

方案二:数据库悲观锁

-- 悲观锁方式扣减库存(事务内)
START TRANSACTION;

-- 查询并锁定库存记录
SELECT quantity INTO @currentQty
FROM product_stock
WHERE product_id = #{productId}
  AND warehouse_id = #{warehouseId}
FOR UPDATE;

-- 检查库存是否足够
IF @currentQty >= #{deductQty} THEN
  -- 扣减库存
  UPDATE product_stock
  SET quantity = quantity - #{deductQty}
  WHERE product_id = #{productId}
    AND warehouse_id = #{warehouseId};

  COMMIT;
  SELECT 'SUCCESS' AS result;
ELSE
  ROLLBACK;
  SELECT 'INSUFFICIENT_STOCK' AS result;
END IF;

方案三:Redis 分布式锁

// Redis 分布式锁
class RedisLock {
  constructor(redisClient) {
    this.redis = redisClient;
    this.defaultTimeout = 30; // 默认30秒超时
  }

  // 获取锁
  async acquire(key, value, timeout = this.defaultTimeout) {
    const result = await this.redis.set(
      `lock:${key}`,
      value,
      'EX', timeout,
      'NX' // 键不存在时才设置
    );
    return result === 'OK';
  }

  // 释放锁(使用 Lua 脚本保证原子性)
  async release(key, value) {
    const script = `
      if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
      else
        return 0
      end
    `;
    await this.redis.eval(script, 1, `lock:${key}`, value);
  }

  // 尝试获取锁(带重试)
  async tryAcquire(key, value, retryCount = 3, retryDelay = 100) {
    for (let i = 0; i < retryCount; i++) {
      if (await this.acquire(key, value)) {
        return true;
      }
      await this.sleep(retryDelay);
    }
    return false;
  }
}

// 库存扣减服务
class StockService {
  constructor(redisClient, lock, mysql) {
    this.redis = redisClient;
    this.lock = lock;
    this.mysql = mysql;
  }

  // 扣减库存(分布式锁方案)
  async deductStock(productId, warehouseId, quantity) {
    const lockKey = `stock:${productId}:${warehouseId}`;
    const lockValue = uuid.v4();

    try {
      // 1. 获取分布式锁
      const acquired = await this.lock.tryAcquire(lockKey, lockValue);
      if (!acquired) {
        throw new Error('系统繁忙,请稍后重试');
      }

      // 2. 从 Redis 获取库存
      let stock = await this.redis.get(`stock:${productId}:${warehouseId}`);
      stock = parseInt(stock) || 0;

      // 3. 检查库存
      if (stock < quantity) {
        throw new Error('库存不足');
      }

      // 4. 扣减 Redis 库存
      await this.redis.decrby(`stock:${productId}:${warehouseId}`, quantity);

      // 5. 异步同步到数据库(最终一致性)
      await this.syncStockToDb(productId, warehouseId, quantity);

      return { success: true, remainingStock: stock - quantity };

    } finally {
      // 6. 释放锁
      await this.lock.release(lockKey, lockValue);
    }
  }

  // 异步同步到数据库
  async syncStockToDb(productId, warehouseId, quantity) {
    await this.redis.lpush('stock_sync_queue', JSON.stringify({
      productId, warehouseId, quantity, timestamp: Date.now()
    }));
  }
}

订单创建流程

// 订单创建服务
class OrderService {
  constructor(orderModel, stockService, lock) {
    this.Order = orderModel;
    this.Stock = stockService;
    this.Lock = lock;
  }

  // 创建订单(带防重复提交)
  async createOrder(userId, items, paymentMethod) {
    const orderNo = this.generateOrderNo();
    const lockKey = `order:${userId}`;

    // 1. 获取用户锁,防止重复提交
    const lockAcquired = await this.Lock.acquire(lockKey, orderNo, 5);
    if (!lockAcquired) {
      throw new Error('订单提交中,请勿重复提交');
    }

    try {
      // 2. 开启事务
      await this.mysql.beginTransaction();

      // 3. 验证商品和库存
      const validatedItems = await this.validateItems(items);

      // 4. 计算订单金额
      const { totalAmount, discount, finalAmount } =
        await this.calculateOrderAmount(validatedItems, userId);

      // 5. 锁定库存(乐观锁)
      const stockResults = [];
      for (const item of validatedItems) {
        const result = await this.Stock.deductWithOptimisticLock(
          item.productId,
          item.warehouseId,
          item.quantity
        );
        if (!result.success) {
          throw new Error(`商品${item.productName}库存不足`);
        }
        stockResults.push(result);
      }

      // 6. 创建订单
      const order = await this.Order.create({
        orderNo,
        userId,
        items: validatedItems,
        totalAmount,
        discount,
        finalAmount,
        paymentMethod,
        status: 'pending_payment',
        createdAt: new Date()
      });

      // 7. 记录库存扣减流水
      await this.Stock.recordStockLog(order.id, stockResults, 'order_deduct');

      // 8. 提交事务
      await this.mysql.commit();

      // 9. 发送订单创建成功消息
      await this.sendOrderCreatedMessage(order);

      return order;

    } catch (error) {
      // 回滚事务
      await this.mysql.rollback();

      // 如果库存已扣减,需要回滚
      await this.rollbackStockDeduction(items);

      throw error;
    } finally {
      // 释放用户锁
      await this.Lock.release(lockKey, orderNo);
    }
  }

  // 乐观锁扣减库存
  async deductWithOptimisticLock(productId, warehouseId, quantity) {
    const maxRetries = 3;

    for (let i = 0; i < maxRetries; i++) {
      // 查询当前库存和版本
      const stock = await this.Stock.findByProductWarehouse(productId, warehouseId);

      if (!stock || stock.quantity < quantity) {
        return { success: false, reason: '库存不足' };
      }

      // 尝试扣减
      const result = await this.mysql.query(`
        UPDATE product_stock
        SET quantity = quantity - ?,
            version = version + 1
        WHERE product_id = ?
          AND warehouse_id = ?
          AND quantity >= ?
          AND version = ?
      `, [quantity, productId, warehouseId, quantity, stock.version]);

      if (result.affectedRows > 0) {
        return {
          success: true,
          remainingStock: stock.quantity - quantity,
          newVersion: stock.version + 1
        };
      }

      // 版本冲突,重试
      console.log(`Stock deduction conflict, retry ${i + 1}`);
      await this.sleep(50);
    }

    return { success: false, reason: '库存扣减失败,请重试' };
  }

  // 生成订单号
  generateOrderNo() {
    const timestamp = Date.now();
    const random = Math.floor(Math.random() * 10000).toString().padStart(4, '0');
    return `PSI${timestamp}${random}`;
  }

  // 验证商品有效性
  async validateItems(items) {
    const validatedItems = [];

    for (const item of items) {
      const product = await this.Product.findById(item.productId);

      if (!product || product.status !== 'active') {
        throw new Error(`商品${item.productId}已下架`);
      }

      if (product.minQuantity > item.quantity) {
        throw new Error(`商品${item.productName}起订量为${product.minQuantity}`);
      }

      validatedItems.push({
        ...item,
        productName: product.name,
        unitPrice: product.price,
        product
      });
    }

    return validatedItems;
  }
}

高并发优化策略

策略 实现方式 效果
库存预热 系统启动时加载库存到 Redis 减少数据库查询,提升响应速度
异步下单 下单请求进入队列,异步处理 削峰填谷,缓解并发压力
请求限流 使用计数器或令牌桶限流 防止系统过载
热点商品隔离 针对秒杀商品单独处理 避免影响普通商品
防重复提交 前端防抖 + 后端幂等校验 避免重复创建订单

库存回滚机制

// 订单取消和库存回滚
class OrderCancelService {
  constructor(orderModel, stockService) {
    this.Order = orderModel;
    this.Stock = stockService;
  }

  // 取消订单并回滚库存
  async cancelOrder(orderId, cancelReason) {
    const order = await this.Order.findById(orderId);

    if (!order) {
      throw new Error('订单不存在');
    }

    if (!this.canCancel(order)) {
      throw new Error('该订单不可取消');
    }

    // 开启事务
    await this.mysql.beginTransaction();

    try {
      // 1. 更新订单状态
      await this.Order.update(orderId, {
        status: 'cancelled',
        cancelReason,
        cancelledAt: new Date()
      });

      // 2. 回滚库存(使用相同的乐观锁机制)
      for (const item of order.items) {
        const result = await this.Stock.increaseWithOptimisticLock(
          item.productId,
          item.warehouseId,
          item.quantity
        );

        if (!result.success) {
          throw new Error(`库存回滚失败:${item.productName}`);
        }
      }

      // 3. 记录库存变动流水
      await this.Stock.recordStockLog(orderId, order.items, 'order_cancel');

      // 4. 如果已支付,退款处理
      if (order.paymentStatus === 'paid') {
        await this.processRefund(order);
      }

      await this.mysql.commit();
      return true;

    } catch (error) {
      await this.mysql.rollback();
      throw error;
    }
  }

  // 定时任务:自动取消超时未支付订单
  async autoCancelExpiredOrders() {
    const expiredOrders = await this.Order.findExpiredOrders();

    for (const order of expiredOrders) {
      try {
        await this.cancelOrder(order.id, '超时未支付自动取消');
        console.log(`Order ${order.orderNo} auto cancelled`);
      } catch (error) {
        console.error(`Cancel order ${order.orderNo} failed:`, error);
      }
    }
  }
}

总结

订单实时处理与并发控制的核心要点:

通过合理的并发控制设计,可以确保在高并发场景下订单处理的正确性和系统的稳定性。

← 下一篇:PSI进销存系统高可用架构与灾备方案设计