IoT 系统重复告警问题分析与解决方案 ——Redis 分布式锁实战
情景再现
在铁路隐蔽工程智能监测系统中,设备传感器数据上报频率较高时,会出现同一设备的同类型告警被重复创建的问题。这导致告警记录冗余,影响系统的可靠性和用户体验。
从系统日志中可以观察到:
19:09:35.706 [pool-2-thread-12] DEBUG c.r.i.m.I.selectIotDeviceByDeviceSn - [debug,135] - ==> Parameters: WT6B00000147(String)
19:09:35.723 [pool-2-thread-25] INFO c.r.i.s.i.DataProcessingService - [checkThresholdAndCreateAlarm,151] - 创建新的告警记录 - 设备: WT6B00000147, 类型: 振动超限, 值: 120, 阈值: 50
19:09:35.724 [pool-2-thread-30] INFO c.r.i.s.i.DataProcessingService - [checkThresholdAndCreateAlarm,151] - 创建新的告警记录 - 设备: WT6B00000147, 类型: 振动超限, 值: 120, 阈值: 50- 两个不同的线程(pool-2-thread-25 和 pool-2-thread-30)同时处理同一设备数据
- 在几乎同一时刻(1ms内)创建了相同的告警记录
- 设备SN:WT6B00000147,告警类型:振动超限,数值:120
问题分析
问题根源:
- 传感器数据通过线程池并行处理
- 同一设备的数据可能在不同的线程中同时处理
- 无并发控制机制
原始问题代码分析 - 竞态条件:
// 原始问题代码
if (!isActiveAlarmExist(device.getDeviceSn(), alarmType)) {
createAlarmRecord(device.getDeviceSn(), alarmType, alarmValue, thresholdValue);
}isActiveAlarmExist():检查Redis是否存在告警keycreateAlarmRecord():创建数据库告警记录并设置Redis key- 两个操作非原子性,在并发情况下产生竞态条件
问题解决
核心思路:
先通过 Redis 抢占 “告警锁”,抢到锁再写入数据库;否则直接丢弃。
选择Redis的setIfAbsent操作作为分布式锁的基础:
- Redis单线程模型保证原子性
- 支持过期时间,避免死锁
- 成熟的分布式缓存方案
最终代码:
/**
* 尝试创建告警记录(原子操作,防重复)
*
* @return true表示成功创建了新告警,false表示已存在活跃告警
*/
private boolean tryCreateAlarmRecord(String deviceSn, String alarmType, Long alarmValue, Long thresholdValue) {
String alarmKey = "alarm:active:" + deviceSn + ":" + alarmType;
try {
// 使用Redis的原子操作setIfAbsent检查并设置告警状态
Boolean isNewAlarm = redisTemplate.opsForValue().setIfAbsent(alarmKey, "1", 24 * 60 * 60, TimeUnit.SECONDS);
if (isNewAlarm != null && isNewAlarm) {
// 创建数据库告警记录
createAlarmRecordWithRetry(deviceSn, alarmType, alarmValue, thresholdValue);
} else {
// 告警已存在,不创建新记录
log.info("告警已存在,跳过创建 - 设备: "+deviceSn+", 类型: "+alarmType);
return false;
}
} catch (Exception e) {
log.warning("创建告警时Redis操作失败 - 设备: "+deviceSn+", 类型: "+alarmType+", 错误: "+ e.getMessage());
// 在Redis操作失败的情况下,作为保守策略,仍然尝试创建告警记录
// 这可能导致少量重复,但保证了告警不会丢失
createAlarmRecord(deviceSn, alarmType, alarmValue, thresholdValue);
return true;
}
}
进一步思考:Redis 与数据库的不一致
- 如果
setIfAbsent成功,但createAlarmRecord失败(数据库宕机、事务回滚等),Redis 里会残留一个“假告警”,导致后续真实告警被静默丢弃。 - 你虽然处理了 Redis 异常,但没处理数据库写入失败时的缓存回滚。
本地事务补偿
伪代码:
// 1. Redis 加锁(防重)
boolean lockSuccess = redis.setIfAbsent(key, "1", 24, HOURS);
if (!lockSuccess) {
// 已存在,跳过
return;
}
try {
// 2. 数据库写入
db.insert(alarm);
// 成功:结束
} catch (Exception e) {
// 3. 数据库失败:回滚 Redis
redis.delete(key);
}为了使系统更可靠,我们再为数据库写入添加重试机制,最多重试3次,每次延迟时间递增。
最终代码实现:
/**
* 尝试创建告警记录(原子操作,防重复 + 数据一致性保证)
*
* @return true表示成功创建了新告警,false表示已存在活跃告警
*/
private boolean tryCreateAlarmRecord(String deviceSn, String alarmType, Long alarmValue, Long thresholdValue) {
String alarmKey = "alarm:active:" + deviceSn + ":" + alarmType;
try {
// 使用Redis的原子操作setIfAbsent检查并设置告警状态
Boolean isNewAlarm = redisTemplate.opsForValue().setIfAbsent(alarmKey, "1", 24 * 60 * 60, TimeUnit.SECONDS);
if (isNewAlarm != null && isNewAlarm) {
// 尝试创建数据库告警记录(带重试机制)
boolean dbSuccess = createAlarmRecordWithRetry(deviceSn, alarmType, alarmValue, thresholdValue);
if (dbSuccess) {
// 数据库写入成功,确认告警创建
log.info("告警创建成功 - 设备: "+deviceSn+", 类型: "+alarmType);
return true;
} else {
// 数据库写入失败,回滚Redis缓存以避免"假告警"
rollbackAlarmCache(alarmKey, deviceSn, alarmType);
return false;
}
} else {
// 告警已存在,不创建新记录
log.info("告警已存在,跳过创建 - 设备: "+deviceSn+", 类型: "+alarmType);
return false;
}
} catch (Exception e) {
log.warning("创建告警时Redis操作失败 - 设备: "+deviceSn+", 类型: "+alarmType+", 错误: "+ e.getMessage());
// 在Redis操作失败的情况下,作为保守策略,仍然尝试创建告警记录
// 这可能导致少量重复,但保证了告警不会丢失
createAlarmRecord(deviceSn, alarmType, alarmValue, thresholdValue);
return true;
}
}
/**
* 创建告警记录(带重试机制 + 数据验证)
*/
private boolean createAlarmRecordWithRetry(String deviceSn, String alarmType, Long alarmValue, Long thresholdValue) {
final int maxRetries = 3;
final long retryDelayMs = 100;
for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
IotAlarm alarm = new IotAlarm();
alarm.setDeviceSn(deviceSn);
alarm.setAlarmType(alarmType);
alarm.setAlarmValue(alarmValue);
alarm.setThresholdValue(thresholdValue);
alarm.setAlarmTime(new Date());
alarm.setStatus(0L);
iotAlarmService.insertIotAlarm(alarm);
// 验证插入是否成功
if (alarm.getId() != null) {
return true; // 数据库写入成功
}
log.info("告警记录插入验证失败(第"+attempt+"次重试)- 设备: "+deviceSn+", 类型: "+alarmType);
} catch (Exception e) {
log.info("告警记录插入失败(第"+attempt+"次重试)- 设备: "+deviceSn+", 类型: "+alarmType+", 错误: "+e.getMessage());
if (attempt < maxRetries) {
try {
Thread.sleep(retryDelayMs * attempt); // 指数退避
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
log.warning("告警记录插入失败,已达到最大重试次数 - 设备: "+deviceSn+", 类型: "+alarmType);
return false;
}
/**
* 回滚告警缓存(保证数据一致性)
*/
private void rollbackAlarmCache(String alarmKey, String deviceSn, String alarmType) {
try {
Boolean deleted = redisTemplate.delete(alarmKey);
if (deleted != null && deleted) {
log.info("已回滚告警缓存 - 设备: "+deviceSn+", 类型: "+alarmType+", Key: "+alarmKey);
} else {
log.warning("回滚告警缓存失败(可能已被其他线程处理)- 设备: "+deviceSn+", 类型: "+alarmType+", Key: "+alarmKey);
}
} catch (Exception e) {
log.warning("回滚告警缓存时发生异常 - 设备: "+deviceSn+", 类型: "+alarmType+", 错误: "+e.getMessage());
}
}方案效果与总结
- 彻底解决重复告警问题
- 高并发下性能损耗小
- Redis + DB 最终一致性得到保障
- 异常降级策略保证告警不丢失
- 适合 IoT、监控、上报类高并发场景
版权申明
本文系作者 @xiin 原创发布在To Future$站点。未经许可,禁止转载。
暂无评论数据