From 9bdd1b523695d9b5493529612cf0dea490d1728b Mon Sep 17 00:00:00 2001 From: tanglong <842690096@qq.com> Date: Thu, 24 Jul 2025 11:40:55 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AF=95=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Mqtt/MqttBackgroundService.cs | 271 ++++++++---------- 1 file changed, 112 insertions(+), 159 deletions(-) diff --git a/YD_AllHeartRates.Api/Mqtt/MqttBackgroundService.cs b/YD_AllHeartRates.Api/Mqtt/MqttBackgroundService.cs index 9e97da1..f17e12b 100644 --- a/YD_AllHeartRates.Api/Mqtt/MqttBackgroundService.cs +++ b/YD_AllHeartRates.Api/Mqtt/MqttBackgroundService.cs @@ -1,5 +1,6 @@  using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.FileSystemGlobbing.Internal; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; @@ -110,175 +111,127 @@ namespace YD_AllHeartRates.Api.Mqtt await foreach (var batch in ReadBatchesAsync(stoppingToken)) { var heartRateEntities = new ConcurrentBag(); - var jumpRopeEntities = new ConcurrentBag(); + var jumpRopeEntities = new ConcurrentBag(); + + var deviceMap = _devices.ToDictionary(x => x.Code, x => x.StudentNo); + var studentMap = _studentList.ToDictionary(x => x.StudentNo, x => x); - Parallel.ForEach(batch, msg => + Parallel.ForEach(batch, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, msg => { - if (string.IsNullOrWhiteSpace(msg.Payload)) return; - - BlePayload payload; try { - payload = JsonSerializer.Deserialize(msg.Payload, _jsonOpt); - if (payload == null) return; - } - catch (JsonException ex) - { - _log.LogWarning(ex, "Payload 不是合法 JSON: {Json}", msg.Payload); - return; - } + if (string.IsNullOrWhiteSpace(msg.Payload)) return; + var payload = JsonSerializer.Deserialize(msg.Payload, _jsonOpt); + if (payload == null || payload.devices == null || payload.devices.Count == 0) return; - var heartRateList = payload.devices.Where(x => x.bs_name.Contains("GTY0")).ToList(); - var jumpRopeList = payload.devices.Where(x => x.bs_name.Contains("RS207")).ToList(); - - var scoreTime = DateTime.ParseExact(payload.send_time, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture); - - foreach (var ble in heartRateList) - { - if (string.IsNullOrWhiteSpace(ble.Broadcast_data)) continue; - - var studentNo = _devices.Where(x => x.Code == ble.bs_name).Select(x => x.StudentNo).FirstOrDefault(); - if (string.IsNullOrEmpty(studentNo)) - continue; - var student = _studentList.FirstOrDefault(x => x.StudentNo == studentNo); - if (student == null || student.GradeId == 0 || student.ClassId == 0) continue; - - var data = ParseHexData(ble.Broadcast_data); - if (data == null) continue; - - int cd = Array.IndexOf(data, (byte)0xCD); - if (cd < 0 || data.Length < cd + 9) continue; - - int battery = data[cd + 1]; - int heartRate = data[cd + 2]; - if (heartRate == 0) continue; - - var entity = new HeartRateData + var scoreTime = DateTime.ParseExact(payload.send_time, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture); + foreach (var ble in payload.devices) { - ScoreTime = scoreTime, - Code = ble.bs_name, - GradeId = student.GradeId, - GradeName = student.GradeName, - ClassId = student.ClassId, - ClassName = student.ClassName ?? "", - SchoolCode = student.SchoolCode ?? "", - Sex = student.Sex, - Value = heartRate, - QuantityOfElectricity = battery, - StudentNo = student.StudentNo, - StudentName = student.StudentName, - Strength = (int)Math.Round(((double)heartRate / (220 - student.Age)) * 100) - }; + if (!deviceMap.TryGetValue(ble.bs_name, out var studentNo)) continue; + if (!studentMap.TryGetValue(studentNo, out var student)) continue; + if (string.IsNullOrWhiteSpace(ble.Broadcast_data)) continue; - heartRateEntities.Add(entity); - var heartRateKey = $"heartRate:{student.StudentNo}"; - _caching.AddObject(heartRateKey, entity, 60); // 1分钟缓存 + var data = ParseHexData(ble.Broadcast_data); + if (data == null) continue; - // 更新学校学生编号集合(Set) - var studentSetKey = $"school:{student.SchoolCode}:students"; - RedisHelper.SAdd(studentSetKey, student.StudentNo); // 自动去重 - RedisHelper.Expire(studentSetKey, 60); - } - - foreach (var ble in jumpRopeList) - { - if (string.IsNullOrWhiteSpace(ble.Broadcast_data)) continue; - - var studentNo = _devices.Where(x => x.Code == ble.bs_name).Select(x => x.StudentNo).FirstOrDefault(); - if (string.IsNullOrEmpty(studentNo)) - continue; - var student = _studentList.FirstOrDefault(x => x.StudentNo == studentNo); - if (student == null || student.GradeId == 0 || student.ClassId == 0) continue; - - var data = ParseHexData(ble.Broadcast_data); - if (data == null) continue; - - int mfIndex = IndexOfSequence(data, new byte[] { 0xFF, 0x04, 0xFF, 0xCF }); - if (mfIndex < 0 || data.Length < mfIndex + 10) continue; - - int jumpCount = data[mfIndex + 5]; - int errorCount = data[mfIndex + 7]; - int battery = data[mfIndex + 10]; - - var jumpData = new JumpRopeData - { - ScoreTime = scoreTime, - Code = ble.bs_name, - GradeId = student.GradeId, - GradeName = student.GradeName, - ClassId = student.ClassId, - ClassName = student.ClassName ?? "", - SchoolCode = student.SchoolCode ?? "", - Sex = student.Sex, - JumpValue = jumpCount, - QuantityOfElectricity = battery, - ErrorNumber = errorCount, - StudentNo = student.StudentNo, - StudentName = student.StudentName - }; - - //if (!string.IsNullOrWhiteSpace(student.SchoolCode)) - //{ - // // WebSocket 推送 - // await _wsSender.SendToSchoolAsync(student.SchoolCode, "heartRate", heartRateEntities); - // await _wsSender.SendToSchoolAsync(student.SchoolCode, "jumpRope", _jumpDailyMap.Values.ToList()); - //} - - var jumpKey = $"jumpRope:active:{student.StudentNo}:{DateTime.Now:yyyyMMdd}"; - var rawKey = $"jumpRope:raw:{student.StudentNo}:{DateTime.Now:yyyyMMdd}"; - - // 当前缓存的总值(当天跳绳数据) - var totalData = _caching.Get(jumpKey); - - // 初始化总值(首次记录) - if (totalData == null) - { - totalData = new JumpRopeData + if (ble.bs_name.Contains("GTY0")) { - StudentNo = jumpData.StudentNo, - JumpValue = 0, - ErrorNumber = 0, - QuantityOfElectricity = jumpData.QuantityOfElectricity, - ScoreTime = jumpData.ScoreTime - }; + int cd = Array.IndexOf(data, (byte)0xCD); + if (cd < 0 || data.Length < cd + 9) continue; + + int battery = data[cd + 1]; + int heartRate = data[cd + 2]; + if (heartRate == 0) continue; + + var entity = new HeartRateData + { + ScoreTime = scoreTime, + Code = ble.bs_name, + GradeId = student.GradeId, + GradeName = student.GradeName, + ClassId = student.ClassId, + ClassName = student.ClassName ?? "", + SchoolCode = student.SchoolCode ?? "", + Sex = student.Sex, + Value = heartRate, + QuantityOfElectricity = battery, + StudentNo = student.StudentNo, + StudentName = student.StudentName, + Strength = (int)Math.Round(((double)heartRate / (220 - student.Age)) * 100) + }; + + heartRateEntities.Add(entity); + _caching.AddObject($"heartRate:{student.StudentNo}", entity, 60); + RedisHelper.SAdd($"school:{student.SchoolCode}:students", student.StudentNo); + } + + else if (ble.bs_name.Contains("RS207")) + { + int mfIndex = IndexOfSequence(data, new byte[] { 0xFF, 0x04, 0xFF, 0xCF }); + if (mfIndex < 0 || data.Length < mfIndex + 10) continue; + + int jumpCount = data[mfIndex + 5]; + int errorCount = data[mfIndex + 7]; + int battery = data[mfIndex + 10]; + + var jumpData = new JumpRopeData + { + ScoreTime = scoreTime, + Code = ble.bs_name, + GradeId = student.GradeId, + GradeName = student.GradeName, + ClassId = student.ClassId, + ClassName = student.ClassName ?? "", + SchoolCode = student.SchoolCode ?? "", + Sex = student.Sex, + JumpValue = jumpCount, + QuantityOfElectricity = battery, + ErrorNumber = errorCount, + StudentNo = student.StudentNo, + StudentName = student.StudentName + }; + + var jumpKey = $"jumpRope:active:{student.StudentNo}:{DateTime.Now:yyyyMMdd}"; + var rawKey = $"jumpRope:raw:{student.StudentNo}:{DateTime.Now:yyyyMMdd}"; + + var totalData = _caching.Get(jumpKey) ?? new JumpRopeData + { + StudentNo = jumpData.StudentNo, + JumpValue = 0, + ErrorNumber = 0, + QuantityOfElectricity = jumpData.QuantityOfElectricity, + ScoreTime = jumpData.ScoreTime + }; + + var lastRaw = _caching.Get(rawKey); + + int deltaJump = jumpData.JumpValue; + int deltaError = jumpData.ErrorNumber; + + if (lastRaw != null) + { + deltaJump = jumpData.JumpValue >= lastRaw.JumpValue + ? jumpData.JumpValue - lastRaw.JumpValue + : jumpData.JumpValue; + + deltaError = jumpData.ErrorNumber >= lastRaw.ErrorNumber + ? jumpData.ErrorNumber - lastRaw.ErrorNumber + : jumpData.ErrorNumber; + } + + totalData.JumpValue += deltaJump; + totalData.ErrorNumber += deltaError; + totalData.QuantityOfElectricity = jumpData.QuantityOfElectricity; + totalData.ScoreTime = jumpData.ScoreTime; + + _caching.AddObject(jumpKey, totalData, 28800); + _caching.AddObject(rawKey, jumpData, 60); + } } - - // 上一条原始累计值(用于计算差值) - var lastRaw = _caching.Get(rawKey); - - // 差值计算 - int deltaJump = jumpData.JumpValue; - int deltaError = jumpData.ErrorNumber; - - if (lastRaw != null) - { - deltaJump = jumpData.JumpValue >= lastRaw.JumpValue - ? jumpData.JumpValue - lastRaw.JumpValue - : jumpData.JumpValue; - - deltaError = jumpData.ErrorNumber >= lastRaw.ErrorNumber - ? jumpData.ErrorNumber - lastRaw.ErrorNumber - : jumpData.ErrorNumber; - } - - // 汇总 - totalData.JumpValue += deltaJump; - totalData.ErrorNumber += deltaError; - totalData.QuantityOfElectricity = jumpData.QuantityOfElectricity; - totalData.ScoreTime = jumpData.ScoreTime; - - // 写入缓存(总值 + 原始值) - _caching.AddObject(jumpKey, totalData, 28800); - _caching.AddObject(rawKey, jumpData, 60); - - // 记录活跃学生编号(便于定时入库) - //RedisHelper.SAdd($"jumpRope:active:{DateTime.Now:yyyyMMdd}", student.StudentNo); - - //var studentSetKey = $"school:{student.SchoolCode}:students"; - //var jumpKey = $"jumpRope:{student.StudentNo}"; - //_caching.AddObject(jumpKey, jumpData, 60); - - //RedisHelper.SAdd(studentSetKey, student.StudentNo); + } + catch (Exception ex) + { + _log.LogError(ex, "处理 BLE 消息异常"); } });