2025-07-21 10:24:05 +08:00

431 lines
18 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Microsoft.EntityFrameworkCore;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System.Reflection.PortableExecutable;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
using YD_AllHeartRates.Api.Context;
using YD_AllHeartRates.Api.Entitys;
using YD_AllHeartRates.Api.SmartSportsEntitys;
using YD_AllHeartRates.Api.Utilities;
using YD_AllHeartRates.Api.WebSocket;
using YD_AllHeartRates.Commons.Dto.Mqtt;
using YD_AllHeartRates.Commons.Dto.Student;
using YD_AllHeartRates.Commons.MemoryCaches;
namespace YD_AllHeartRates.Api.Mqtt
{
/// <summary>
/// 接受写入数据
/// </summary>
public class MqttBackgroundService : BackgroundService
{
private readonly IMqttClient _client;
private readonly ILogger _log;
private readonly UserContext _userContext;
private readonly SmartSportsContext _smartSportsContext;
private readonly ICaching _caching;
private readonly IServiceScopeFactory _scopeFactory;
private readonly List<StudentDto> _studentList = new();
private readonly List<HeartRateData> _pendingHeartRates = new();
private DateTime _lastHeartRateSaveTime = DateTime.Now;
private DateTime _lastJumpRopeSaveTime = DateTime.Now;
private readonly Channel<MqttMessage> _queue = Channel.CreateUnbounded<MqttMessage>();
static readonly JsonSerializerOptions _jsonOpt = new() { PropertyNameCaseInsensitive = true };
public MqttBackgroundService(
IMqttClient client,
ILogger<MqttBackgroundService> log,
UserContext userContext,
SmartSportsContext smartSportsContext,
ICaching caching,
IServiceScopeFactory scopeFactory
)
{
_client = client;
_log = log;
_userContext = userContext;
_smartSportsContext = smartSportsContext;
_caching = caching;
_scopeFactory = scopeFactory;
_studentList = (from d in _smartSportsContext.Device
join s in _smartSportsContext.Student on d.StudentNo equals s.StudentNo
join c in _smartSportsContext.Class on s.ClassId equals c.Id
where s.StudentStatus == 1
select new StudentDto
{
SchoolCode = s.SchoolCode,
StudentNo = s.StudentNo,
StudentName = s.StudentName,
Sex = s.Sex,
Age = s.Age,
HeartRateId = d.Code,
JumpRopeId = d.Code,
ClassId = s.ClassId,
ClassName = s.ClassName,
GradeId = c.GradeId,
GradeName = c.GradeName ?? "",
DeviceType = d.DeviceType,
}).ToList();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_ = RunJumpSyncLoopAsync(stoppingToken);
await _client.ConnectAsync(new MqttClientOptionsBuilder()
.WithTcpServer(AppSettings.Mqtt.Host, AppSettings.Mqtt.Port)
.Build(), stoppingToken);
await _client.SubscribeAsync(new MqttTopicFilterBuilder()
.WithTopic(AppSettings.Mqtt.Topic)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build(), stoppingToken);
_client.ApplicationMessageReceivedAsync += e =>
{
var msg = new MqttMessage
{
Topic = e.ApplicationMessage.Topic,
Payload = e.ApplicationMessage.Payload == null ? string.Empty : Encoding.UTF8.GetString(e.ApplicationMessage.Payload),
ReceivedAt = DateTime.Now
};
return _queue.Writer.WriteAsync(msg, stoppingToken).AsTask();
};
await foreach (var batch in ReadBatchesAsync(stoppingToken))
{
var heartRateEntities = new List<HeartRateData>();
var jumpRopeEntities = new List<JumpRopeData>();
foreach (var msg in batch)
{
if (string.IsNullOrWhiteSpace(msg.Payload)) continue;
List<BlePayload>? list;
try
{
list = JsonSerializer.Deserialize<List<BlePayload>>(msg.Payload, _jsonOpt);
}
catch (JsonException ex)
{
_log.LogWarning(ex, "Payload 不是合法 JSON: {Json}", msg.Payload);
continue;
}
if (list is null || list.Count == 0) continue;
var heartRateList = list.Where(x => x.BleName.Contains("GTY0")).ToList();
var jumpRopeList = list.Where(x => x.BleName.Contains("RS207")).ToList();
foreach (var ble in heartRateList)
{
if (string.IsNullOrWhiteSpace(ble.RawData)) continue;
var student = _studentList.FirstOrDefault(x => x.HeartRateId == ble.BleName);
if (student == null || student.GradeId == 0 || student.ClassId == 0) continue;
var data = ParseHexData(ble.RawData);
if (data == null) continue;
int cd = Array.IndexOf(data, (byte)0xCD);
if (cd < 0 || data.Length < cd + 9) continue;
int battery = data[cd + 1];
int heartRate = data[cd + 2];
if (heartRate == 0) continue;
var entity = new HeartRateData
{
ScoreTime = ble.Timestamp,
Code = ble.BleName,
GradeId = student.GradeId,
GradeName = student.GradeName,
ClassId = student.ClassId,
ClassName = student.ClassName ?? "",
SchoolCode = student.SchoolCode ?? "",
Sex = student.Sex,
Value = heartRate,
QuantityOfElectricity = battery,
StudentNo = student.StudentNo,
StudentName = student.StudentName,
Strength = (int)Math.Round(((double)heartRate / (220 - student.Age)) * 100)
};
heartRateEntities.Add(entity);
var heartRateKey = $"heartRate:{student.StudentNo}";
_caching.AddObject(heartRateKey, entity, 60); // 1分钟缓存
// 更新学校学生编号集合Set
var studentSetKey = $"school:{student.SchoolCode}:students";
RedisHelper.SAdd(studentSetKey, student.StudentNo); // 自动去重
RedisHelper.Expire(studentSetKey, 60);
}
foreach (var ble in jumpRopeList)
{
if (string.IsNullOrWhiteSpace(ble.RawData)) continue;
var student = _studentList.FirstOrDefault(x => x.JumpRopeId == ble.BleName);
if (student == null || student?.GradeId == 0 || student?.ClassId == 0) continue;
var data = ParseHexData(ble.RawData);
if (data == null) continue;
int mfIndex = IndexOfSequence(data, new byte[] { 0xFF, 0x04, 0xFF, 0xCF });
if (mfIndex < 0 || data.Length < mfIndex + 10) continue;
int jumpCount = data[mfIndex + 5];
int errorCount = data[mfIndex + 7];
int battery = data[mfIndex + 10];
var jumpData = new JumpRopeData
{
ScoreTime = ble.Timestamp,
Code = ble.BleName,
GradeId = student.GradeId,
GradeName = student.GradeName,
ClassId = student.ClassId,
ClassName = student.ClassName ?? "",
SchoolCode = student.SchoolCode ?? "",
Sex = student.Sex,
JumpValue = jumpCount,
QuantityOfElectricity = battery,
ErrorNumber = errorCount,
StudentNo = student.StudentNo,
StudentName = student.StudentName
};
//if (!string.IsNullOrWhiteSpace(student.SchoolCode))
//{
// // WebSocket 推送
// await _wsSender.SendToSchoolAsync(student.SchoolCode, "heartRate", heartRateEntities);
// await _wsSender.SendToSchoolAsync(student.SchoolCode, "jumpRope", _jumpDailyMap.Values.ToList());
//}
var jumpKey = $"jumpRope:active:{student.StudentNo}:{DateTime.Now:yyyyMMdd}";
var rawKey = $"jumpRope:raw:{student.StudentNo}:{DateTime.Now:yyyyMMdd}";
// 当前缓存的总值(当天跳绳数据)
var totalData = _caching.Get<JumpRopeData>(jumpKey);
// 初始化总值(首次记录)
if (totalData == null)
{
totalData = new JumpRopeData
{
StudentNo = jumpData.StudentNo,
JumpValue = 0,
ErrorNumber = 0,
QuantityOfElectricity = jumpData.QuantityOfElectricity,
ScoreTime = jumpData.ScoreTime
};
}
// 上一条原始累计值(用于计算差值)
var lastRaw = _caching.Get<JumpRopeData>(rawKey);
// 差值计算
int deltaJump = jumpData.JumpValue;
int deltaError = jumpData.ErrorNumber;
if (lastRaw != null)
{
deltaJump = jumpData.JumpValue >= lastRaw.JumpValue
? jumpData.JumpValue - lastRaw.JumpValue
: jumpData.JumpValue;
deltaError = jumpData.ErrorNumber >= lastRaw.ErrorNumber
? jumpData.ErrorNumber - lastRaw.ErrorNumber
: jumpData.ErrorNumber;
}
// 汇总
totalData.JumpValue += deltaJump;
totalData.ErrorNumber += deltaError;
totalData.QuantityOfElectricity = jumpData.QuantityOfElectricity;
totalData.ScoreTime = jumpData.ScoreTime;
// 写入缓存(总值 + 原始值)
_caching.AddObject(jumpKey, totalData, 28800);
_caching.AddObject(rawKey, jumpData, 60);
// 记录活跃学生编号(便于定时入库)
//RedisHelper.SAdd($"jumpRope:active:{DateTime.Now:yyyyMMdd}", student.StudentNo);
//var studentSetKey = $"school:{student.SchoolCode}:students";
//var jumpKey = $"jumpRope:{student.StudentNo}";
//_caching.AddObject(jumpKey, jumpData, 60);
//RedisHelper.SAdd(studentSetKey, student.StudentNo);
}
}
// 心率每分钟保存一次
if ((DateTime.Now - _lastHeartRateSaveTime).TotalSeconds >= 60 && _pendingHeartRates.Any())
{
_userContext.HeartRateData.AddRange(_pendingHeartRates);
await _userContext.SaveChangesAsync();
_pendingHeartRates.Clear();
_lastHeartRateSaveTime = DateTime.Now;
}
_pendingHeartRates.AddRange(heartRateEntities);
// 跳绳每日更新保存
// 每隔 60 秒执行一次
//_ = Task.Run(async () =>
//{
// while (true)
// {
// await SyncTodayJumpDataToDbAsync();
// await Task.Delay(10000); // 每 10 秒检查一次
// }
//});
}
}
private async Task RunJumpSyncLoopAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
await SyncTodayJumpDataToDbAsync();
}
catch (Exception ex)
{
_log.LogError(ex, "跳绳数据同步异常");
}
try
{
await Task.Delay(TimeSpan.FromSeconds(10), token);
}
catch (TaskCanceledException)
{
// 如果是取消任务导致 delay 中断,直接退出
break;
}
}
_log.LogInformation("跳绳数据同步任务已退出");
}
private byte[]? ParseHexData(string hex)
{
try
{
int len = hex.Length / 2;
var bytes = new byte[len];
for (int i = 0; i < len; i++)
bytes[i] = Convert.ToByte(hex.Substring(i * 2, 2), 16);
return bytes;
}
catch
{
return null;
}
}
private async IAsyncEnumerable<List<MqttMessage>> ReadBatchesAsync(
[EnumeratorCancellation] CancellationToken ct)
{
var buffer = new List<MqttMessage>(AppSettings.Mqtt.BatchSize);
while (await _queue.Reader.WaitToReadAsync(ct))
{
while (_queue.Reader.TryRead(out var m))
{
buffer.Add(m);
if (buffer.Count == AppSettings.Mqtt.BatchSize)
{
yield return buffer;
buffer = new List<MqttMessage>(AppSettings.Mqtt.BatchSize);
}
}
}
if (buffer.Count > 0) yield return buffer;
}
public static int IndexOfSequence(byte[] buffer, byte[] pattern)
{
if (pattern.Length == 0 || buffer.Length < pattern.Length)
return -1;
for (int i = 0; i <= buffer.Length - pattern.Length; i++)
{
bool matched = true;
for (int j = 0; j < pattern.Length; j++)
{
if (buffer[i + j] != pattern[j])
{
matched = false;
break;
}
}
if (matched)
return i;
}
return -1;
}
public async Task SyncTodayJumpDataToDbAsync()
{
try
{
if ((DateTime.Now - _lastJumpRopeSaveTime).TotalSeconds < 600)
return;
var today = DateTime.Today;
var dateStr = today.ToString("yyyyMMdd");
var studentNos = RedisHelper.SMembers($"jumpRope:active:{dateStr}");
if (studentNos == null || studentNos.Length == 0)
return;
using var scope = _scopeFactory.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<UserContext>();
foreach (var studentNo in studentNos)
{
var cacheKey = $"jumpRope:active:{studentNo}:{dateStr}";
var data = _caching.Get<JumpRopeData>(cacheKey);
if (data == null || string.IsNullOrWhiteSpace(data.StudentNo)) continue;
var exist = await dbContext.JumpRopeData
.FirstOrDefaultAsync(x => x.StudentNo == data.StudentNo && x.ScoreTime.Date == today);
if (exist != null)
{
exist.JumpValue = data.JumpValue;
exist.ErrorNumber = data.ErrorNumber;
exist.QuantityOfElectricity = data.QuantityOfElectricity;
exist.ScoreTime = data.ScoreTime;
}
else
{
dbContext.JumpRopeData.Add(data);
}
}
await dbContext.SaveChangesAsync();
_lastJumpRopeSaveTime = DateTime.Now;
}
catch (Exception ex)
{
_log.LogError(ex, "SyncTodayJumpDataToDbAsync数据同步错误。");
}
}
}
}