2025-07-15 16:27:35 +08:00

302 lines
13 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Microsoft.EntityFrameworkCore;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System.Reflection.PortableExecutable;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
using YD_AllHeartRates.Api.Context;
using YD_AllHeartRates.Api.Entitys;
using YD_AllHeartRates.Api.SmartSportsEntitys;
using YD_AllHeartRates.Api.Utilities;
using YD_AllHeartRates.Api.WebSocket;
using YD_AllHeartRates.Commons.Dto.Mqtt;
using YD_AllHeartRates.Commons.Dto.Student;
using YD_AllHeartRates.Commons.MemoryCaches;
namespace YD_AllHeartRates.Api.Mqtt
{
/// <summary>
/// 接受写入数据
/// </summary>
public class MqttBackgroundService : BackgroundService
{
private readonly IMqttClient _client;
private readonly ILogger _log;
private readonly UserContext _userContext;
private readonly SmartSportsContext _smartSportsContext;
private readonly ICaching _caching;
private readonly List<StudentDto> _studentList = new();
private readonly List<HeartRateData> _pendingHeartRates = new();
private readonly Dictionary<string, JumpRopeData> _jumpDailyMap = new();
private DateTime _lastHeartRateSaveTime = DateTime.UtcNow;
private DateTime _lastJumpRopeSaveTime = DateTime.UtcNow;
private readonly Channel<MqttMessage> _queue = Channel.CreateUnbounded<MqttMessage>();
static readonly JsonSerializerOptions _jsonOpt = new() { PropertyNameCaseInsensitive = true };
public MqttBackgroundService(
IMqttClient client,
ILogger<MqttBackgroundService> log,
UserContext userContext,
SmartSportsContext smartSportsContext,
ICaching caching
)
{
_client = client;
_log = log;
_userContext = userContext;
_smartSportsContext = smartSportsContext;
_caching = caching;
_studentList = (from d in _smartSportsContext.Device
join s in _smartSportsContext.Student on d.StudentNo equals s.StudentNo
join c in _smartSportsContext.Class on s.ClassId equals c.Id
where s.StudentStatus == 1
select new StudentDto
{
SchoolCode = s.SchoolCode,
StudentNo = s.StudentNo,
StudentName = s.StudentName,
Sex = s.Sex,
Age = s.Age,
HeartRateId = d.Code,
JumpRopeId = d.Code,
ClassId = s.ClassId,
ClassName = s.ClassName,
GradeId = c.GradeId,
GradeName = c.GradeName ?? "",
DeviceType = d.DeviceType,
}).ToList();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _client.ConnectAsync(new MqttClientOptionsBuilder()
.WithTcpServer(AppSettings.Mqtt.Host, AppSettings.Mqtt.Port)
.Build(), stoppingToken);
await _client.SubscribeAsync(new MqttTopicFilterBuilder()
.WithTopic(AppSettings.Mqtt.Topic)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build(), stoppingToken);
_client.ApplicationMessageReceivedAsync += e =>
{
var msg = new MqttMessage
{
Topic = e.ApplicationMessage.Topic,
Payload = e.ApplicationMessage.Payload == null ? string.Empty : Encoding.UTF8.GetString(e.ApplicationMessage.Payload),
ReceivedAt = DateTime.UtcNow
};
return _queue.Writer.WriteAsync(msg, stoppingToken).AsTask();
};
await foreach (var batch in ReadBatchesAsync(stoppingToken))
{
var heartRateEntities = new List<HeartRateData>();
var jumpRopeEntities = new List<JumpRopeData>();
foreach (var msg in batch)
{
if (string.IsNullOrWhiteSpace(msg.Payload)) continue;
List<BlePayload>? list;
try
{
list = JsonSerializer.Deserialize<List<BlePayload>>(msg.Payload, _jsonOpt);
}
catch (JsonException ex)
{
_log.LogWarning(ex, "Payload 不是合法 JSON: {Json}", msg.Payload);
continue;
}
if (list is null || list.Count == 0) continue;
var heartRateList = list.Where(x => x.BleName.Contains("GTY0")).ToList();
var jumpRopeList = list.Where(x => x.BleName.Contains("RS207")).ToList();
foreach (var ble in heartRateList)
{
if (string.IsNullOrWhiteSpace(ble.RawData)) continue;
var student = _studentList.FirstOrDefault(x => x.HeartRateId == ble.BleName);
if (student == null || student.GradeId == 0 || student.ClassId == 0) continue;
var data = ParseHexData(ble.RawData);
if (data == null) continue;
int cd = Array.IndexOf(data, (byte)0xCD);
if (cd < 0 || data.Length < cd + 9) continue;
int battery = data[cd + 1];
int heartRate = data[cd + 2];
if (heartRate == 0) continue;
var entity = new HeartRateData
{
ScoreTime = ble.Timestamp,
Code = ble.BleName,
GradeId = student.GradeId,
GradeName = student.GradeName,
ClassId = student.ClassId,
ClassName = student.ClassName ?? "",
SchoolCode = student.SchoolCode ?? "",
Sex = student.Sex,
Value = heartRate,
QuantityOfElectricity = battery,
StudentNo = student.StudentNo,
StudentName = student.StudentName,
Strength = (int)Math.Round(((double)heartRate / (220 - student.Age)) * 100)
};
heartRateEntities.Add(entity);
var heartRateKey = $"heartRate:{student.StudentNo}";
_caching.AddObject(heartRateKey, entity, 60); // 1分钟缓存
// 更新学校学生编号集合Set
var studentSetKey = $"school:{student.SchoolCode}:students";
RedisHelper.SAdd(studentSetKey, student.StudentNo); // 自动去重
RedisHelper.Expire(studentSetKey, 60);
}
foreach (var ble in jumpRopeList)
{
if (string.IsNullOrWhiteSpace(ble.RawData)) continue;
var student = _studentList.FirstOrDefault(x => x.JumpRopeId == ble.BleName);
if (student == null || student.GradeId == 0 || student.ClassId == 0) continue;
var data = ParseHexData(ble.RawData);
if (data == null) continue;
int mfIndex = Array.IndexOf(data, new byte[] { 0xFF, 0x04, 0xFF, 0xCF });
if (mfIndex < 0 || data.Length < mfIndex + 10) continue;
int jumpCount = data[mfIndex + 5] + (data[mfIndex + 6] << 8);
int errorCount = data[mfIndex + 7];
int battery = data[mfIndex + 9];
var jumpData = new JumpRopeData
{
ScoreTime = ble.Timestamp,
Code = ble.BleName,
GradeId = student.GradeId,
GradeName = student.GradeName,
ClassId = student.ClassId,
ClassName = student.ClassName ?? "",
SchoolCode = student.SchoolCode ?? "",
Sex = student.Sex,
JumpValue = jumpCount,
QuantityOfElectricity = battery,
ErrorNumber = errorCount,
StudentNo = student.StudentNo,
StudentName = student.StudentName
};
//if (!string.IsNullOrWhiteSpace(student.SchoolCode))
//{
// // WebSocket 推送
// await _wsSender.SendToSchoolAsync(student.SchoolCode, "heartRate", heartRateEntities);
// await _wsSender.SendToSchoolAsync(student.SchoolCode, "jumpRope", _jumpDailyMap.Values.ToList());
//}
string key = $"{jumpData.StudentNo}_{jumpData.ScoreTime:yyyyMMdd}";
if (_jumpDailyMap.ContainsKey(key))
{
_jumpDailyMap[key].JumpValue += jumpData.JumpValue;
_jumpDailyMap[key].ErrorNumber += jumpData.ErrorNumber;
_jumpDailyMap[key].QuantityOfElectricity = jumpData.QuantityOfElectricity;
_jumpDailyMap[key].ScoreTime = jumpData.ScoreTime;
}
else
{
_jumpDailyMap[key] = jumpData;
}
var studentSetKey = $"school:{student.SchoolCode}:students";
var jumpKey = $"jumpRope:{student.StudentNo}";
_caching.AddObject(jumpKey, jumpData, 60);
RedisHelper.SAdd(studentSetKey, student.StudentNo);
}
}
// 心率每分钟保存一次
if ((DateTime.UtcNow - _lastHeartRateSaveTime).TotalSeconds >= 60 && _pendingHeartRates.Any())
{
_userContext.HeartRateData.AddRange(_pendingHeartRates);
await _userContext.SaveChangesAsync();
_pendingHeartRates.Clear();
_lastHeartRateSaveTime = DateTime.UtcNow;
}
_pendingHeartRates.AddRange(heartRateEntities);
// 跳绳每日更新保存
if ((DateTime.UtcNow - _lastJumpRopeSaveTime).TotalSeconds >= 60 && _jumpDailyMap.Any())
{
foreach (var data in _jumpDailyMap.Values)
{
var exist = await _userContext.JumpRopeData
.FirstOrDefaultAsync(x => x.StudentNo == data.StudentNo && x.ScoreTime.Date == data.ScoreTime.Date);
if (exist != null)
{
exist.JumpValue = data.JumpValue;
exist.ErrorNumber = data.ErrorNumber;
exist.QuantityOfElectricity = data.QuantityOfElectricity;
exist.ScoreTime = data.ScoreTime;
}
else
{
_userContext.JumpRopeData.Add(data);
}
}
await _userContext.SaveChangesAsync();
_lastJumpRopeSaveTime = DateTime.UtcNow;
}
}
}
private byte[]? ParseHexData(string hex)
{
try
{
int len = hex.Length / 2;
var bytes = new byte[len];
for (int i = 0; i < len; i++)
bytes[i] = Convert.ToByte(hex.Substring(i * 2, 2), 16);
return bytes;
}
catch
{
return null;
}
}
private async IAsyncEnumerable<List<MqttMessage>> ReadBatchesAsync(
[EnumeratorCancellation] CancellationToken ct)
{
var buffer = new List<MqttMessage>(AppSettings.Mqtt.BatchSize);
while (await _queue.Reader.WaitToReadAsync(ct))
{
while (_queue.Reader.TryRead(out var m))
{
buffer.Add(m);
if (buffer.Count == AppSettings.Mqtt.BatchSize)
{
yield return buffer;
buffer = new List<MqttMessage>(AppSettings.Mqtt.BatchSize);
}
}
}
if (buffer.Count > 0) yield return buffer;
}
}
}