PSI进销存系统订单实时处理与并发控制
引言
在进销存系统中,订单处理是最核心的业务场景。当多个用户同时下单、库存扣减时,如何保证数据的一致性,避免超卖问题是系统设计的关键挑战。本文介绍订单实时处理与并发控制的最佳实践。
并发问题场景分析
进销存系统中常见的并发问题:
| 场景 | 问题描述 | 影响 |
|---|---|---|
| 库存超卖 | 多个请求同时读取库存,库存不足仍被扣减 | 库存为负数,无法发货 |
| 重复下单 | 用户快速点击多次,创建多个订单 | 重复扣款,财务混乱 |
| 金额计算错误 | 同时计算价格,优惠叠加错误 | 订单金额不正确 |
| 库存扣减与订单状态不一致 | 库存扣减成功,订单创建失败 | 数据不一致 |
库存扣减方案
方案一:数据库乐观锁
-- 库存表设计
CREATE TABLE `product_stock` (
`id` BIGINT PRIMARY KEY AUTO_INCREMENT,
`product_id` BIGINT NOT NULL,
`warehouse_id` BIGINT NOT NULL,
`quantity` INT NOT NULL DEFAULT 0,
`version` INT NOT NULL DEFAULT 0, -- 版本号,用于乐观锁
`updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY `uk_product_warehouse` (`product_id`, `warehouse_id`)
) ENGINE=InnoDB;
-- 乐观锁扣减库存
UPDATE product_stock
SET quantity = quantity - #{deductQty},
version = version + 1
WHERE product_id = #{productId}
AND warehouse_id = #{warehouseId}
AND quantity >= #{deductQty}
AND version = #{currentVersion};
-- 检查影响行数
-- 如果返回影响行数为0,则说明库存不足或版本冲突
方案二:数据库悲观锁
-- 悲观锁方式扣减库存(事务内)
START TRANSACTION;
-- 查询并锁定库存记录
SELECT quantity INTO @currentQty
FROM product_stock
WHERE product_id = #{productId}
AND warehouse_id = #{warehouseId}
FOR UPDATE;
-- 检查库存是否足够
IF @currentQty >= #{deductQty} THEN
-- 扣减库存
UPDATE product_stock
SET quantity = quantity - #{deductQty}
WHERE product_id = #{productId}
AND warehouse_id = #{warehouseId};
COMMIT;
SELECT 'SUCCESS' AS result;
ELSE
ROLLBACK;
SELECT 'INSUFFICIENT_STOCK' AS result;
END IF;
方案三:Redis 分布式锁
// Redis 分布式锁
class RedisLock {
constructor(redisClient) {
this.redis = redisClient;
this.defaultTimeout = 30; // 默认30秒超时
}
// 获取锁
async acquire(key, value, timeout = this.defaultTimeout) {
const result = await this.redis.set(
`lock:${key}`,
value,
'EX', timeout,
'NX' // 键不存在时才设置
);
return result === 'OK';
}
// 释放锁(使用 Lua 脚本保证原子性)
async release(key, value) {
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
await this.redis.eval(script, 1, `lock:${key}`, value);
}
// 尝试获取锁(带重试)
async tryAcquire(key, value, retryCount = 3, retryDelay = 100) {
for (let i = 0; i < retryCount; i++) {
if (await this.acquire(key, value)) {
return true;
}
await this.sleep(retryDelay);
}
return false;
}
}
// 库存扣减服务
class StockService {
constructor(redisClient, lock, mysql) {
this.redis = redisClient;
this.lock = lock;
this.mysql = mysql;
}
// 扣减库存(分布式锁方案)
async deductStock(productId, warehouseId, quantity) {
const lockKey = `stock:${productId}:${warehouseId}`;
const lockValue = uuid.v4();
try {
// 1. 获取分布式锁
const acquired = await this.lock.tryAcquire(lockKey, lockValue);
if (!acquired) {
throw new Error('系统繁忙,请稍后重试');
}
// 2. 从 Redis 获取库存
let stock = await this.redis.get(`stock:${productId}:${warehouseId}`);
stock = parseInt(stock) || 0;
// 3. 检查库存
if (stock < quantity) {
throw new Error('库存不足');
}
// 4. 扣减 Redis 库存
await this.redis.decrby(`stock:${productId}:${warehouseId}`, quantity);
// 5. 异步同步到数据库(最终一致性)
await this.syncStockToDb(productId, warehouseId, quantity);
return { success: true, remainingStock: stock - quantity };
} finally {
// 6. 释放锁
await this.lock.release(lockKey, lockValue);
}
}
// 异步同步到数据库
async syncStockToDb(productId, warehouseId, quantity) {
await this.redis.lpush('stock_sync_queue', JSON.stringify({
productId, warehouseId, quantity, timestamp: Date.now()
}));
}
}
订单创建流程
// 订单创建服务
class OrderService {
constructor(orderModel, stockService, lock) {
this.Order = orderModel;
this.Stock = stockService;
this.Lock = lock;
}
// 创建订单(带防重复提交)
async createOrder(userId, items, paymentMethod) {
const orderNo = this.generateOrderNo();
const lockKey = `order:${userId}`;
// 1. 获取用户锁,防止重复提交
const lockAcquired = await this.Lock.acquire(lockKey, orderNo, 5);
if (!lockAcquired) {
throw new Error('订单提交中,请勿重复提交');
}
try {
// 2. 开启事务
await this.mysql.beginTransaction();
// 3. 验证商品和库存
const validatedItems = await this.validateItems(items);
// 4. 计算订单金额
const { totalAmount, discount, finalAmount } =
await this.calculateOrderAmount(validatedItems, userId);
// 5. 锁定库存(乐观锁)
const stockResults = [];
for (const item of validatedItems) {
const result = await this.Stock.deductWithOptimisticLock(
item.productId,
item.warehouseId,
item.quantity
);
if (!result.success) {
throw new Error(`商品${item.productName}库存不足`);
}
stockResults.push(result);
}
// 6. 创建订单
const order = await this.Order.create({
orderNo,
userId,
items: validatedItems,
totalAmount,
discount,
finalAmount,
paymentMethod,
status: 'pending_payment',
createdAt: new Date()
});
// 7. 记录库存扣减流水
await this.Stock.recordStockLog(order.id, stockResults, 'order_deduct');
// 8. 提交事务
await this.mysql.commit();
// 9. 发送订单创建成功消息
await this.sendOrderCreatedMessage(order);
return order;
} catch (error) {
// 回滚事务
await this.mysql.rollback();
// 如果库存已扣减,需要回滚
await this.rollbackStockDeduction(items);
throw error;
} finally {
// 释放用户锁
await this.Lock.release(lockKey, orderNo);
}
}
// 乐观锁扣减库存
async deductWithOptimisticLock(productId, warehouseId, quantity) {
const maxRetries = 3;
for (let i = 0; i < maxRetries; i++) {
// 查询当前库存和版本
const stock = await this.Stock.findByProductWarehouse(productId, warehouseId);
if (!stock || stock.quantity < quantity) {
return { success: false, reason: '库存不足' };
}
// 尝试扣减
const result = await this.mysql.query(`
UPDATE product_stock
SET quantity = quantity - ?,
version = version + 1
WHERE product_id = ?
AND warehouse_id = ?
AND quantity >= ?
AND version = ?
`, [quantity, productId, warehouseId, quantity, stock.version]);
if (result.affectedRows > 0) {
return {
success: true,
remainingStock: stock.quantity - quantity,
newVersion: stock.version + 1
};
}
// 版本冲突,重试
console.log(`Stock deduction conflict, retry ${i + 1}`);
await this.sleep(50);
}
return { success: false, reason: '库存扣减失败,请重试' };
}
// 生成订单号
generateOrderNo() {
const timestamp = Date.now();
const random = Math.floor(Math.random() * 10000).toString().padStart(4, '0');
return `PSI${timestamp}${random}`;
}
// 验证商品有效性
async validateItems(items) {
const validatedItems = [];
for (const item of items) {
const product = await this.Product.findById(item.productId);
if (!product || product.status !== 'active') {
throw new Error(`商品${item.productId}已下架`);
}
if (product.minQuantity > item.quantity) {
throw new Error(`商品${item.productName}起订量为${product.minQuantity}`);
}
validatedItems.push({
...item,
productName: product.name,
unitPrice: product.price,
product
});
}
return validatedItems;
}
}
高并发优化策略
| 策略 | 实现方式 | 效果 |
|---|---|---|
| 库存预热 | 系统启动时加载库存到 Redis | 减少数据库查询,提升响应速度 |
| 异步下单 | 下单请求进入队列,异步处理 | 削峰填谷,缓解并发压力 |
| 请求限流 | 使用计数器或令牌桶限流 | 防止系统过载 |
| 热点商品隔离 | 针对秒杀商品单独处理 | 避免影响普通商品 |
| 防重复提交 | 前端防抖 + 后端幂等校验 | 避免重复创建订单 |
库存回滚机制
// 订单取消和库存回滚
class OrderCancelService {
constructor(orderModel, stockService) {
this.Order = orderModel;
this.Stock = stockService;
}
// 取消订单并回滚库存
async cancelOrder(orderId, cancelReason) {
const order = await this.Order.findById(orderId);
if (!order) {
throw new Error('订单不存在');
}
if (!this.canCancel(order)) {
throw new Error('该订单不可取消');
}
// 开启事务
await this.mysql.beginTransaction();
try {
// 1. 更新订单状态
await this.Order.update(orderId, {
status: 'cancelled',
cancelReason,
cancelledAt: new Date()
});
// 2. 回滚库存(使用相同的乐观锁机制)
for (const item of order.items) {
const result = await this.Stock.increaseWithOptimisticLock(
item.productId,
item.warehouseId,
item.quantity
);
if (!result.success) {
throw new Error(`库存回滚失败:${item.productName}`);
}
}
// 3. 记录库存变动流水
await this.Stock.recordStockLog(orderId, order.items, 'order_cancel');
// 4. 如果已支付,退款处理
if (order.paymentStatus === 'paid') {
await this.processRefund(order);
}
await this.mysql.commit();
return true;
} catch (error) {
await this.mysql.rollback();
throw error;
}
}
// 定时任务:自动取消超时未支付订单
async autoCancelExpiredOrders() {
const expiredOrders = await this.Order.findExpiredOrders();
for (const order of expiredOrders) {
try {
await this.cancelOrder(order.id, '超时未支付自动取消');
console.log(`Order ${order.orderNo} auto cancelled`);
} catch (error) {
console.error(`Cancel order ${order.orderNo} failed:`, error);
}
}
}
}
总结
订单实时处理与并发控制的核心要点:
- 库存扣减方案:根据业务场景选择乐观锁或分布式锁
- 防重复提交:前端防抖 + 后端幂等多重保障
- 事务一致性:订单创建和库存扣减在同一事务中
- 异步处理:非核心流程异步化,提升系统吞吐量
- 回滚机制:订单取消时正确回滚库存,保持数据一致性
通过合理的并发控制设计,可以确保在高并发场景下订单处理的正确性和系统的稳定性。