情景再现

在铁路隐蔽工程智能监测系统中,设备传感器数据上报频率较高时,会出现同一设备的同类型告警被重复创建的问题。这导致告警记录冗余,影响系统的可靠性和用户体验。

从系统日志中可以观察到:

19:09:35.706 [pool-2-thread-12] DEBUG c.r.i.m.I.selectIotDeviceByDeviceSn - [debug,135] - ==> Parameters: WT6B00000147(String)
19:09:35.723 [pool-2-thread-25] INFO  c.r.i.s.i.DataProcessingService - [checkThresholdAndCreateAlarm,151] - 创建新的告警记录 - 设备: WT6B00000147, 类型: 振动超限, 值: 120, 阈值: 50
19:09:35.724 [pool-2-thread-30] INFO  c.r.i.s.i.DataProcessingService - [checkThresholdAndCreateAlarm,151] - 创建新的告警记录 - 设备: WT6B00000147, 类型: 振动超限, 值: 120, 阈值: 50
  • 两个不同的线程(pool-2-thread-25 和 pool-2-thread-30)同时处理同一设备数据
  • 在几乎同一时刻(1ms内)创建了相同的告警记录
  • 设备SN:WT6B00000147,告警类型:振动超限,数值:120

问题分析

问题根源:

  • 传感器数据通过线程池并行处理
  • 同一设备的数据可能在不同的线程中同时处理
  • 无并发控制机制

原始问题代码分析 - 竞态条件:

// 原始问题代码
if (!isActiveAlarmExist(device.getDeviceSn(), alarmType)) {
    createAlarmRecord(device.getDeviceSn(), alarmType, alarmValue, thresholdValue);
}
  • isActiveAlarmExist():检查Redis是否存在告警key
  • createAlarmRecord():创建数据库告警记录并设置Redis key
  • 两个操作非原子性,在并发情况下产生竞态条件

问题解决

核心思路:

先通过 Redis 抢占 “告警锁”,抢到锁再写入数据库;否则直接丢弃。

选择Redis的setIfAbsent操作作为分布式锁的基础:

  • Redis单线程模型保证原子性
  • 支持过期时间,避免死锁
  • 成熟的分布式缓存方案

最终代码:

/**
 * 尝试创建告警记录(原子操作,防重复)
 * 
 * @return true表示成功创建了新告警,false表示已存在活跃告警
 */
private boolean tryCreateAlarmRecord(String deviceSn, String alarmType, Long alarmValue, Long thresholdValue) {
    String alarmKey = "alarm:active:" + deviceSn + ":" + alarmType;
    
    try {
        // 使用Redis的原子操作setIfAbsent检查并设置告警状态
        Boolean isNewAlarm = redisTemplate.opsForValue().setIfAbsent(alarmKey, "1", 24 * 60 * 60, TimeUnit.SECONDS);
        
        if (isNewAlarm != null && isNewAlarm) {
            // 创建数据库告警记录
            createAlarmRecordWithRetry(deviceSn, alarmType, alarmValue, thresholdValue);
        } else {
            // 告警已存在,不创建新记录
            log.info("告警已存在,跳过创建 - 设备: "+deviceSn+", 类型: "+alarmType);
            return false;
        }
    } catch (Exception e) {
        log.warning("创建告警时Redis操作失败 - 设备: "+deviceSn+", 类型: "+alarmType+", 错误: "+ e.getMessage());
        // 在Redis操作失败的情况下,作为保守策略,仍然尝试创建告警记录
        // 这可能导致少量重复,但保证了告警不会丢失
        createAlarmRecord(deviceSn, alarmType, alarmValue, thresholdValue);
        return true;
    }
}

进一步思考:Redis 与数据库的不一致

  • 如果 setIfAbsent 成功,但 createAlarmRecord 失败(数据库宕机、事务回滚等),Redis 里会残留一个“假告警”,导致后续真实告警被静默丢弃。
  • 你虽然处理了 Redis 异常,但没处理数据库写入失败时的缓存回滚。

本地事务补偿

伪代码:

// 1. Redis 加锁(防重)
boolean lockSuccess = redis.setIfAbsent(key, "1", 24, HOURS);

if (!lockSuccess) {
    // 已存在,跳过
    return;
}

try {
    // 2. 数据库写入
    db.insert(alarm);
    // 成功:结束
} catch (Exception e) {
    // 3. 数据库失败:回滚 Redis
    redis.delete(key);
}

为了使系统更可靠,我们再为数据库写入添加重试机制,最多重试3次,每次延迟时间递增。

最终代码实现:

/**
 * 尝试创建告警记录(原子操作,防重复 + 数据一致性保证)
 * 
 * @return true表示成功创建了新告警,false表示已存在活跃告警
 */
private boolean tryCreateAlarmRecord(String deviceSn, String alarmType, Long alarmValue, Long thresholdValue) {
    String alarmKey = "alarm:active:" + deviceSn + ":" + alarmType;
    
    try {
        // 使用Redis的原子操作setIfAbsent检查并设置告警状态
        Boolean isNewAlarm = redisTemplate.opsForValue().setIfAbsent(alarmKey, "1", 24 * 60 * 60, TimeUnit.SECONDS);
        
        if (isNewAlarm != null && isNewAlarm) {
            // 尝试创建数据库告警记录(带重试机制)
            boolean dbSuccess = createAlarmRecordWithRetry(deviceSn, alarmType, alarmValue, thresholdValue);
            
            if (dbSuccess) {
                // 数据库写入成功,确认告警创建
                log.info("告警创建成功 - 设备: "+deviceSn+", 类型: "+alarmType);
                return true;
            } else {
                // 数据库写入失败,回滚Redis缓存以避免"假告警"
                rollbackAlarmCache(alarmKey, deviceSn, alarmType);
                return false;
            }
        } else {
            // 告警已存在,不创建新记录
            log.info("告警已存在,跳过创建 - 设备: "+deviceSn+", 类型: "+alarmType);
            return false;
        }
    } catch (Exception e) {
        log.warning("创建告警时Redis操作失败 - 设备: "+deviceSn+", 类型: "+alarmType+", 错误: "+ e.getMessage());
        // 在Redis操作失败的情况下,作为保守策略,仍然尝试创建告警记录
        // 这可能导致少量重复,但保证了告警不会丢失
        createAlarmRecord(deviceSn, alarmType, alarmValue, thresholdValue);
        return true;
    }
}

/**
 * 创建告警记录(带重试机制 + 数据验证)
 */
private boolean createAlarmRecordWithRetry(String deviceSn, String alarmType, Long alarmValue, Long thresholdValue) {
    final int maxRetries = 3;
    final long retryDelayMs = 100;
    
    for (int attempt = 1; attempt <= maxRetries; attempt++) {
        try {
            IotAlarm alarm = new IotAlarm();
            alarm.setDeviceSn(deviceSn);
            alarm.setAlarmType(alarmType);
            alarm.setAlarmValue(alarmValue);
            alarm.setThresholdValue(thresholdValue);
            alarm.setAlarmTime(new Date());
            alarm.setStatus(0L);
            
            iotAlarmService.insertIotAlarm(alarm);
            
            // 验证插入是否成功
            if (alarm.getId() != null) {
                return true; // 数据库写入成功
            }
            
            log.info("告警记录插入验证失败(第"+attempt+"次重试)- 设备: "+deviceSn+", 类型: "+alarmType);
            
        } catch (Exception e) {
            log.info("告警记录插入失败(第"+attempt+"次重试)- 设备: "+deviceSn+", 类型: "+alarmType+", 错误: "+e.getMessage());
            
            if (attempt < maxRetries) {
                try {
                    Thread.sleep(retryDelayMs * attempt); // 指数退避
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
    
    log.warning("告警记录插入失败,已达到最大重试次数 - 设备: "+deviceSn+", 类型: "+alarmType);
    return false;
}

/**
 * 回滚告警缓存(保证数据一致性)
 */
private void rollbackAlarmCache(String alarmKey, String deviceSn, String alarmType) {
    try {
        Boolean deleted = redisTemplate.delete(alarmKey);
        if (deleted != null && deleted) {
            log.info("已回滚告警缓存 - 设备: "+deviceSn+", 类型: "+alarmType+", Key: "+alarmKey);
        } else {
            log.warning("回滚告警缓存失败(可能已被其他线程处理)- 设备: "+deviceSn+", 类型: "+alarmType+", Key: "+alarmKey);
        }
    } catch (Exception e) {
        log.warning("回滚告警缓存时发生异常 - 设备: "+deviceSn+", 类型: "+alarmType+", 错误: "+e.getMessage());
    }
}

方案效果与总结

  • 彻底解决重复告警问题
  • 高并发下性能损耗小
  • Redis + DB 最终一致性得到保障
  • 异常降级策略保证告警不丢失
  • 适合 IoT、监控、上报类高并发场景
分类: Java后端项目相关 标签: Java精选Redis

评论

暂无评论数据

暂无评论数据

目录