This commit is contained in:
tanglong 2025-07-24 11:26:59 +08:00
parent ac45401a00
commit 08790e2e25
9 changed files with 16 additions and 27 deletions

View File

@ -3,6 +3,7 @@ using Microsoft.EntityFrameworkCore;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System.Collections.Concurrent;
using System.Globalization;
using System.Reflection.PortableExecutable;
using System.Runtime.CompilerServices;
@ -108,41 +109,29 @@ namespace YD_AllHeartRates.Api.Mqtt
await foreach (var batch in ReadBatchesAsync(stoppingToken))
{
var heartRateEntities = new List<HeartRateData>();
var jumpRopeEntities = new List<JumpRopeData>();
var heartRateEntities = new ConcurrentBag<HeartRateData>();
var jumpRopeEntities = new ConcurrentBag<JumpRopeData>();
foreach (var msg in batch)
Parallel.ForEach(batch, msg =>
{
if (string.IsNullOrWhiteSpace(msg.Payload)) continue;
if (string.IsNullOrWhiteSpace(msg.Payload)) return;
List<BlePayload>? list = null;
BlePayload payload;
try
{
var trimmed = msg.Payload.TrimStart();
if (trimmed.StartsWith("{"))
{
var single = JsonSerializer.Deserialize<BlePayload>(msg.Payload, _jsonOpt);
if (single != null)
list = new List<BlePayload> { single };
}
else if (trimmed.StartsWith("["))
{
list = JsonSerializer.Deserialize<List<BlePayload>>(msg.Payload, _jsonOpt);
}
payload = JsonSerializer.Deserialize<BlePayload>(msg.Payload, _jsonOpt);
if (payload == null) return;
}
catch (JsonException ex)
{
_log.LogWarning(ex, "Payload 不是合法 JSON: {Json}", msg.Payload);
continue;
return;
}
if (list is null || list.Count == 0) continue;
var heartRateList = payload.devices.Where(x => x.bs_name.Contains("GTY0")).ToList();
var jumpRopeList = payload.devices.Where(x => x.bs_name.Contains("RS207")).ToList();
var heartRateList = list.SelectMany(x => x.devices).Where(x => x.bs_name.Contains("GTY0")).ToList();
var jumpRopeList = list.SelectMany(x => x.devices).Where(x => x.bs_name.Contains("RS207")).ToList();
var scoreTime = DateTime.ParseExact(list.Select(x => x.send_time).First(), "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture);
var scoreTime = DateTime.ParseExact(payload.send_time, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture);
foreach (var ble in heartRateList)
{
@ -291,7 +280,7 @@ namespace YD_AllHeartRates.Api.Mqtt
//RedisHelper.SAdd(studentSetKey, student.StudentNo);
}
}
});
// 心率每分钟保存一次
if ((DateTime.Now - _lastHeartRateSaveTime).TotalSeconds >= 60 && _pendingHeartRates.Any())

View File

@ -43,6 +43,6 @@
"Host": "8.153.108.90",
"Port": 1883,
"Topic": "heartrates/topic",
"BatchSize": 1
"BatchSize": 10
}
}

View File

@ -14,7 +14,7 @@ using System.Reflection;
[assembly: System.Reflection.AssemblyCompanyAttribute("YD_AllHeartRates.Api")]
[assembly: System.Reflection.AssemblyConfigurationAttribute("Debug")]
[assembly: System.Reflection.AssemblyFileVersionAttribute("1.0.0.0")]
[assembly: System.Reflection.AssemblyInformationalVersionAttribute("1.0.0+e2e455beb818072bf47a3f72e7c28aee0d81f2f6")]
[assembly: System.Reflection.AssemblyInformationalVersionAttribute("1.0.0+ac45401a00234bcfe5b0bc7e4b43ab7a3e737f16")]
[assembly: System.Reflection.AssemblyProductAttribute("YD_AllHeartRates.Api")]
[assembly: System.Reflection.AssemblyTitleAttribute("YD_AllHeartRates.Api")]
[assembly: System.Reflection.AssemblyVersionAttribute("1.0.0.0")]

View File

@ -1 +1 @@
24a647519bb5c818af84137d9f158bd3c8bb26dee151c991a4a8d9432822805b
2a88393a86ca30e91023931fadfc25c17abd0bd7e8685dbc1401c11194e83ca9