This commit is contained in:
tanglong 2025-07-24 11:40:55 +08:00
parent 19fc9d1963
commit 9bdd1b5236

View File

@ -1,5 +1,6 @@

using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.FileSystemGlobbing.Internal;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
@ -112,173 +113,125 @@ namespace YD_AllHeartRates.Api.Mqtt
var heartRateEntities = new ConcurrentBag<HeartRateData>();
var jumpRopeEntities = new ConcurrentBag<JumpRopeData>();
Parallel.ForEach(batch, msg =>
{
if (string.IsNullOrWhiteSpace(msg.Payload)) return;
var deviceMap = _devices.ToDictionary(x => x.Code, x => x.StudentNo);
var studentMap = _studentList.ToDictionary(x => x.StudentNo, x => x);
BlePayload payload;
Parallel.ForEach(batch, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, msg =>
{
try
{
payload = JsonSerializer.Deserialize<BlePayload>(msg.Payload, _jsonOpt);
if (payload == null) return;
}
catch (JsonException ex)
{
_log.LogWarning(ex, "Payload 不是合法 JSON: {Json}", msg.Payload);
return;
}
if (string.IsNullOrWhiteSpace(msg.Payload)) return;
var payload = JsonSerializer.Deserialize<BlePayload>(msg.Payload, _jsonOpt);
if (payload == null || payload.devices == null || payload.devices.Count == 0) return;
var heartRateList = payload.devices.Where(x => x.bs_name.Contains("GTY0")).ToList();
var jumpRopeList = payload.devices.Where(x => x.bs_name.Contains("RS207")).ToList();
var scoreTime = DateTime.ParseExact(payload.send_time, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture);
foreach (var ble in heartRateList)
{
if (string.IsNullOrWhiteSpace(ble.Broadcast_data)) continue;
var studentNo = _devices.Where(x => x.Code == ble.bs_name).Select(x => x.StudentNo).FirstOrDefault();
if (string.IsNullOrEmpty(studentNo))
continue;
var student = _studentList.FirstOrDefault(x => x.StudentNo == studentNo);
if (student == null || student.GradeId == 0 || student.ClassId == 0) continue;
var data = ParseHexData(ble.Broadcast_data);
if (data == null) continue;
int cd = Array.IndexOf(data, (byte)0xCD);
if (cd < 0 || data.Length < cd + 9) continue;
int battery = data[cd + 1];
int heartRate = data[cd + 2];
if (heartRate == 0) continue;
var entity = new HeartRateData
var scoreTime = DateTime.ParseExact(payload.send_time, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture);
foreach (var ble in payload.devices)
{
ScoreTime = scoreTime,
Code = ble.bs_name,
GradeId = student.GradeId,
GradeName = student.GradeName,
ClassId = student.ClassId,
ClassName = student.ClassName ?? "",
SchoolCode = student.SchoolCode ?? "",
Sex = student.Sex,
Value = heartRate,
QuantityOfElectricity = battery,
StudentNo = student.StudentNo,
StudentName = student.StudentName,
Strength = (int)Math.Round(((double)heartRate / (220 - student.Age)) * 100)
};
if (!deviceMap.TryGetValue(ble.bs_name, out var studentNo)) continue;
if (!studentMap.TryGetValue(studentNo, out var student)) continue;
if (string.IsNullOrWhiteSpace(ble.Broadcast_data)) continue;
heartRateEntities.Add(entity);
var heartRateKey = $"heartRate:{student.StudentNo}";
_caching.AddObject(heartRateKey, entity, 60); // 1分钟缓存
var data = ParseHexData(ble.Broadcast_data);
if (data == null) continue;
// 更新学校学生编号集合Set
var studentSetKey = $"school:{student.SchoolCode}:students";
RedisHelper.SAdd(studentSetKey, student.StudentNo); // 自动去重
RedisHelper.Expire(studentSetKey, 60);
}
foreach (var ble in jumpRopeList)
{
if (string.IsNullOrWhiteSpace(ble.Broadcast_data)) continue;
var studentNo = _devices.Where(x => x.Code == ble.bs_name).Select(x => x.StudentNo).FirstOrDefault();
if (string.IsNullOrEmpty(studentNo))
continue;
var student = _studentList.FirstOrDefault(x => x.StudentNo == studentNo);
if (student == null || student.GradeId == 0 || student.ClassId == 0) continue;
var data = ParseHexData(ble.Broadcast_data);
if (data == null) continue;
int mfIndex = IndexOfSequence(data, new byte[] { 0xFF, 0x04, 0xFF, 0xCF });
if (mfIndex < 0 || data.Length < mfIndex + 10) continue;
int jumpCount = data[mfIndex + 5];
int errorCount = data[mfIndex + 7];
int battery = data[mfIndex + 10];
var jumpData = new JumpRopeData
{
ScoreTime = scoreTime,
Code = ble.bs_name,
GradeId = student.GradeId,
GradeName = student.GradeName,
ClassId = student.ClassId,
ClassName = student.ClassName ?? "",
SchoolCode = student.SchoolCode ?? "",
Sex = student.Sex,
JumpValue = jumpCount,
QuantityOfElectricity = battery,
ErrorNumber = errorCount,
StudentNo = student.StudentNo,
StudentName = student.StudentName
};
//if (!string.IsNullOrWhiteSpace(student.SchoolCode))
//{
// // WebSocket 推送
// await _wsSender.SendToSchoolAsync(student.SchoolCode, "heartRate", heartRateEntities);
// await _wsSender.SendToSchoolAsync(student.SchoolCode, "jumpRope", _jumpDailyMap.Values.ToList());
//}
var jumpKey = $"jumpRope:active:{student.StudentNo}:{DateTime.Now:yyyyMMdd}";
var rawKey = $"jumpRope:raw:{student.StudentNo}:{DateTime.Now:yyyyMMdd}";
// 当前缓存的总值(当天跳绳数据)
var totalData = _caching.Get<JumpRopeData>(jumpKey);
// 初始化总值(首次记录)
if (totalData == null)
{
totalData = new JumpRopeData
if (ble.bs_name.Contains("GTY0"))
{
StudentNo = jumpData.StudentNo,
JumpValue = 0,
ErrorNumber = 0,
QuantityOfElectricity = jumpData.QuantityOfElectricity,
ScoreTime = jumpData.ScoreTime
};
int cd = Array.IndexOf(data, (byte)0xCD);
if (cd < 0 || data.Length < cd + 9) continue;
int battery = data[cd + 1];
int heartRate = data[cd + 2];
if (heartRate == 0) continue;
var entity = new HeartRateData
{
ScoreTime = scoreTime,
Code = ble.bs_name,
GradeId = student.GradeId,
GradeName = student.GradeName,
ClassId = student.ClassId,
ClassName = student.ClassName ?? "",
SchoolCode = student.SchoolCode ?? "",
Sex = student.Sex,
Value = heartRate,
QuantityOfElectricity = battery,
StudentNo = student.StudentNo,
StudentName = student.StudentName,
Strength = (int)Math.Round(((double)heartRate / (220 - student.Age)) * 100)
};
heartRateEntities.Add(entity);
_caching.AddObject($"heartRate:{student.StudentNo}", entity, 60);
RedisHelper.SAdd($"school:{student.SchoolCode}:students", student.StudentNo);
}
else if (ble.bs_name.Contains("RS207"))
{
int mfIndex = IndexOfSequence(data, new byte[] { 0xFF, 0x04, 0xFF, 0xCF });
if (mfIndex < 0 || data.Length < mfIndex + 10) continue;
int jumpCount = data[mfIndex + 5];
int errorCount = data[mfIndex + 7];
int battery = data[mfIndex + 10];
var jumpData = new JumpRopeData
{
ScoreTime = scoreTime,
Code = ble.bs_name,
GradeId = student.GradeId,
GradeName = student.GradeName,
ClassId = student.ClassId,
ClassName = student.ClassName ?? "",
SchoolCode = student.SchoolCode ?? "",
Sex = student.Sex,
JumpValue = jumpCount,
QuantityOfElectricity = battery,
ErrorNumber = errorCount,
StudentNo = student.StudentNo,
StudentName = student.StudentName
};
var jumpKey = $"jumpRope:active:{student.StudentNo}:{DateTime.Now:yyyyMMdd}";
var rawKey = $"jumpRope:raw:{student.StudentNo}:{DateTime.Now:yyyyMMdd}";
var totalData = _caching.Get<JumpRopeData>(jumpKey) ?? new JumpRopeData
{
StudentNo = jumpData.StudentNo,
JumpValue = 0,
ErrorNumber = 0,
QuantityOfElectricity = jumpData.QuantityOfElectricity,
ScoreTime = jumpData.ScoreTime
};
var lastRaw = _caching.Get<JumpRopeData>(rawKey);
int deltaJump = jumpData.JumpValue;
int deltaError = jumpData.ErrorNumber;
if (lastRaw != null)
{
deltaJump = jumpData.JumpValue >= lastRaw.JumpValue
? jumpData.JumpValue - lastRaw.JumpValue
: jumpData.JumpValue;
deltaError = jumpData.ErrorNumber >= lastRaw.ErrorNumber
? jumpData.ErrorNumber - lastRaw.ErrorNumber
: jumpData.ErrorNumber;
}
totalData.JumpValue += deltaJump;
totalData.ErrorNumber += deltaError;
totalData.QuantityOfElectricity = jumpData.QuantityOfElectricity;
totalData.ScoreTime = jumpData.ScoreTime;
_caching.AddObject(jumpKey, totalData, 28800);
_caching.AddObject(rawKey, jumpData, 60);
}
}
// 上一条原始累计值(用于计算差值)
var lastRaw = _caching.Get<JumpRopeData>(rawKey);
// 差值计算
int deltaJump = jumpData.JumpValue;
int deltaError = jumpData.ErrorNumber;
if (lastRaw != null)
{
deltaJump = jumpData.JumpValue >= lastRaw.JumpValue
? jumpData.JumpValue - lastRaw.JumpValue
: jumpData.JumpValue;
deltaError = jumpData.ErrorNumber >= lastRaw.ErrorNumber
? jumpData.ErrorNumber - lastRaw.ErrorNumber
: jumpData.ErrorNumber;
}
// 汇总
totalData.JumpValue += deltaJump;
totalData.ErrorNumber += deltaError;
totalData.QuantityOfElectricity = jumpData.QuantityOfElectricity;
totalData.ScoreTime = jumpData.ScoreTime;
// 写入缓存(总值 + 原始值)
_caching.AddObject(jumpKey, totalData, 28800);
_caching.AddObject(rawKey, jumpData, 60);
// 记录活跃学生编号(便于定时入库)
//RedisHelper.SAdd($"jumpRope:active:{DateTime.Now:yyyyMMdd}", student.StudentNo);
//var studentSetKey = $"school:{student.SchoolCode}:students";
//var jumpKey = $"jumpRope:{student.StudentNo}";
//_caching.AddObject(jumpKey, jumpData, 60);
//RedisHelper.SAdd(studentSetKey, student.StudentNo);
}
catch (Exception ex)
{
_log.LogError(ex, "处理 BLE 消息异常");
}
});