2025-07-25 14:18:27 +08:00

417 lines
18 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.FileSystemGlobbing.Internal;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System.Collections.Concurrent;
using System.Globalization;
using System.Reflection.PortableExecutable;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
using YD_AllHeartRates.Api.Context;
using YD_AllHeartRates.Api.Entitys;
using YD_AllHeartRates.Api.SmartSportsEntitys;
using YD_AllHeartRates.Api.Utilities;
using YD_AllHeartRates.Api.WebSocket;
using YD_AllHeartRates.Commons.Dto.Mqtt;
using YD_AllHeartRates.Commons.Dto.Student;
using YD_AllHeartRates.Commons.MemoryCaches;
namespace YD_AllHeartRates.Api.Mqtt
{
/// <summary>
/// 接受写入数据
/// </summary>
public class MqttBackgroundService : BackgroundService
{
private readonly IMqttClient _client;
private readonly ILogger _log;
private readonly UserContext _userContext;
private readonly SmartSportsContext _smartSportsContext;
private readonly ICaching _caching;
private readonly IServiceScopeFactory _scopeFactory;
private readonly List<HeartRateData> _pendingHeartRates = new();
private DateTime _lastHeartRateSaveTime = DateTime.Now;
private DateTime _lastJumpRopeSaveTime = DateTime.Now;
private readonly Channel<MqttMessage> _queue = Channel.CreateUnbounded<MqttMessage>();
static readonly JsonSerializerOptions _jsonOpt = new() { PropertyNameCaseInsensitive = true };
public MqttBackgroundService(
IMqttClient client,
ILogger<MqttBackgroundService> log,
UserContext userContext,
SmartSportsContext smartSportsContext,
ICaching caching,
IServiceScopeFactory scopeFactory
)
{
_client = client;
_log = log;
_userContext = userContext;
_smartSportsContext = smartSportsContext;
_caching = caching;
_scopeFactory = scopeFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_ = RunJumpSyncLoopAsync(stoppingToken);
await _client.ConnectAsync(new MqttClientOptionsBuilder()
.WithTcpServer(AppSettings.Mqtt.Host, AppSettings.Mqtt.Port)
.Build(), stoppingToken);
await _client.SubscribeAsync(new MqttTopicFilterBuilder()
.WithTopic(AppSettings.Mqtt.Topic)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build(), stoppingToken);
_client.ApplicationMessageReceivedAsync += e =>
{
var msg = new MqttMessage
{
Topic = e.ApplicationMessage.Topic,
Payload = e.ApplicationMessage.Payload == null ? string.Empty : Encoding.UTF8.GetString(e.ApplicationMessage.Payload),
ReceivedAt = DateTime.Now
};
return _queue.Writer.WriteAsync(msg, stoppingToken).AsTask();
};
await foreach (var batch in ReadBatchesAsync(stoppingToken))
{
List<StudentDto> studentList;
List<S_Device> devices;
studentList = _caching.Get<List<StudentDto>>(AppSettings.StudentListCacheKey);
devices = _caching.Get<List<S_Device>>(AppSettings.DeviceListCacheKey);
if (studentList == null || devices == null)
{
studentList = (from d in _smartSportsContext.Device
join s in _smartSportsContext.Student on d.StudentNo equals s.StudentNo
join c in _smartSportsContext.Class on s.ClassId equals c.Id
where s.StudentStatus == 1
select new StudentDto
{
SchoolCode = s.SchoolCode,
StudentNo = s.StudentNo,
StudentName = s.StudentName,
Sex = s.Sex,
Age = s.Age,
HeartRateId = d.Code,
JumpRopeId = d.Code,
ClassId = s.ClassId,
ClassName = s.ClassName,
GradeId = c.GradeId,
GradeName = c.GradeName ?? "",
DeviceType = d.DeviceType,
}).ToList();
devices = _smartSportsContext.Device.ToList();
_caching.AddObject(AppSettings.StudentListCacheKey, studentList, 28800);
_caching.AddObject(AppSettings.DeviceListCacheKey, devices, 28800);
}
var heartRateEntities = new ConcurrentBag<HeartRateData>();
var jumpRopeEntities = new ConcurrentBag<JumpRopeData>();
var deviceHMap = devices.Where(x => x.DeviceType == 1).ToDictionary(x => x.Code, x => x.StudentNo);
var deviceJMap = devices.Where(x => x.DeviceType == 2).ToDictionary(x => x.Code, x => x.StudentNo);
var studentMap = studentList.GroupBy(x => x.StudentNo).ToDictionary(g => g.Key, g => g.First());
Parallel.ForEach(batch, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, msg =>
{
try
{
if (string.IsNullOrWhiteSpace(msg.Payload)) return;
var payload = JsonSerializer.Deserialize<BlePayload>(msg.Payload, _jsonOpt);
if (payload == null || payload.devices == null || payload.devices.Count == 0) return;
var scoreTime = DateTime.ParseExact(payload.send_time, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture);
foreach (var ble in payload.devices)
{
if (string.IsNullOrWhiteSpace(ble.Broadcast_data)) continue;
var data = ParseHexData(ble.Broadcast_data);
if (data == null) continue;
if (ble.bs_name.Contains("GTY0"))
{
if (!deviceHMap.TryGetValue(ble.bs_name, out var studentNo)) continue;
if (!studentMap.TryGetValue(studentNo, out var student)) continue;
int cd = Array.IndexOf(data, (byte)0xCD);
if (cd < 0 || data.Length < cd + 9) continue;
int battery = data[cd + 1];
int heartRate = data[cd + 2];
if (heartRate == 0) continue;
var entity = new HeartRateData
{
ScoreTime = scoreTime,
Code = ble.bs_name,
GradeId = student.GradeId,
GradeName = student.GradeName,
ClassId = student.ClassId,
ClassName = student.ClassName ?? "",
SchoolCode = student.SchoolCode ?? "",
Sex = student.Sex,
Value = heartRate,
QuantityOfElectricity = battery,
StudentNo = student.StudentNo,
StudentName = student.StudentName,
Strength = (int)Math.Round(((double)heartRate / (220 - student.Age)) * 100)
};
heartRateEntities.Add(entity);
_caching.AddObject($"heartRate:{student.StudentNo}", entity, 60);
RedisHelper.SAdd($"school:{student.SchoolCode}:students", student.StudentNo);
}
else if (ble.bs_name.Contains("RS207"))
{
if (!deviceJMap.TryGetValue(ble.bs_name, out var studentNo)) continue;
if (!studentMap.TryGetValue(studentNo, out var student)) continue;
int mfIndex = IndexOfSequence(data, new byte[] { 0xFF, 0x04, 0xFF, 0xCF });
if (mfIndex < 0 || data.Length < mfIndex + 10) continue;
int jumpCount = data[mfIndex + 5];
int errorCount = data[mfIndex + 7];
int battery = data[mfIndex + 10];
var jumpData = new JumpRopeData
{
ScoreTime = scoreTime,
Code = ble.bs_name,
GradeId = student.GradeId,
GradeName = student.GradeName,
ClassId = student.ClassId,
ClassName = student.ClassName ?? "",
SchoolCode = student.SchoolCode ?? "",
Sex = student.Sex,
JumpValue = jumpCount,
QuantityOfElectricity = battery,
ErrorNumber = errorCount,
StudentNo = student.StudentNo,
StudentName = student.StudentName
};
var jumpKey = $"jumpRope:active:{student.StudentNo}:{DateTime.Now:yyyyMMdd}";
var rawKey = $"jumpRope:raw:{student.StudentNo}:{DateTime.Now:yyyyMMdd}";
var totalData = _caching.Get<JumpRopeData>(jumpKey) ?? new JumpRopeData
{
Id = jumpData.Id,
StudentNo = jumpData.StudentNo,
ClassId = jumpData.ClassId,
ClassName = jumpData.ClassName,
Code = jumpData.Code,
GradeId = jumpData.GradeId,
GradeName = jumpData.GradeName,
SchoolCode = jumpData.SchoolCode,
Sex = jumpData.Sex,
StudentName = jumpData.StudentName,
JumpValue = 0,
ErrorNumber = 0,
QuantityOfElectricity = jumpData.QuantityOfElectricity,
ScoreTime = jumpData.ScoreTime
};
var lastRaw = _caching.Get<JumpRopeData>(rawKey);
int deltaJump = jumpData.JumpValue;
int deltaError = jumpData.ErrorNumber;
if (lastRaw != null)
{
deltaJump = jumpData.JumpValue >= lastRaw.JumpValue
? jumpData.JumpValue - lastRaw.JumpValue
: jumpData.JumpValue;
deltaError = jumpData.ErrorNumber >= lastRaw.ErrorNumber
? jumpData.ErrorNumber - lastRaw.ErrorNumber
: jumpData.ErrorNumber;
}
totalData.JumpValue += deltaJump;
totalData.ErrorNumber += deltaError;
totalData.QuantityOfElectricity = jumpData.QuantityOfElectricity;
totalData.ScoreTime = jumpData.ScoreTime;
_caching.AddObject(jumpKey, totalData, 28800);
_caching.AddObject(rawKey, jumpData, 60);
}
}
}
catch (Exception ex)
{
_log.LogError(ex, "处理 BLE 消息异常");
}
});
// 心率每分钟保存一次
if ((DateTime.Now - _lastHeartRateSaveTime).TotalSeconds >= 60 && _pendingHeartRates.Any())
{
_userContext.HeartRateData.AddRange(_pendingHeartRates);
await _userContext.SaveChangesAsync();
_pendingHeartRates.Clear();
_lastHeartRateSaveTime = DateTime.Now;
}
_pendingHeartRates.AddRange(heartRateEntities);
// 跳绳每日更新保存
// 每隔 60 秒执行一次
//_ = Task.Run(async () =>
//{
// while (true)
// {
// await SyncTodayJumpDataToDbAsync();
// await Task.Delay(10000); // 每 10 秒检查一次
// }
//});
}
}
private async Task RunJumpSyncLoopAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
await SyncTodayJumpDataToDbAsync();
}
catch (Exception ex)
{
_log.LogError(ex, "跳绳数据同步异常");
}
try
{
await Task.Delay(TimeSpan.FromSeconds(10), token);
}
catch (TaskCanceledException)
{
// 如果是取消任务导致 delay 中断,直接退出
break;
}
}
_log.LogInformation("跳绳数据同步任务已退出");
}
private byte[]? ParseHexData(string hex)
{
try
{
int len = hex.Length / 2;
var bytes = new byte[len];
for (int i = 0; i < len; i++)
bytes[i] = Convert.ToByte(hex.Substring(i * 2, 2), 16);
return bytes;
}
catch
{
return null;
}
}
private async IAsyncEnumerable<List<MqttMessage>> ReadBatchesAsync(
[EnumeratorCancellation] CancellationToken ct)
{
var buffer = new List<MqttMessage>(AppSettings.Mqtt.BatchSize);
while (await _queue.Reader.WaitToReadAsync(ct))
{
while (_queue.Reader.TryRead(out var m))
{
buffer.Add(m);
if (buffer.Count == AppSettings.Mqtt.BatchSize)
{
yield return buffer;
buffer = new List<MqttMessage>(AppSettings.Mqtt.BatchSize);
}
}
}
if (buffer.Count > 0) yield return buffer;
}
public static int IndexOfSequence(byte[] buffer, byte[] pattern)
{
if (pattern.Length == 0 || buffer.Length < pattern.Length)
return -1;
for (int i = 0; i <= buffer.Length - pattern.Length; i++)
{
bool matched = true;
for (int j = 0; j < pattern.Length; j++)
{
if (buffer[i + j] != pattern[j])
{
matched = false;
break;
}
}
if (matched)
return i;
}
return -1;
}
public async Task SyncTodayJumpDataToDbAsync()
{
try
{
if ((DateTime.Now - _lastJumpRopeSaveTime).TotalSeconds < 60)
return;
var today = DateTime.Today;
var dateStr = today.ToString("yyyyMMdd");
var keys = RedisHelper.Keys($"jumpRope:active:*:{dateStr}");
using var scope = _scopeFactory.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<UserContext>();
foreach (var key in keys)
{
var data = _caching.Get<JumpRopeData>(key);
if (data == null || string.IsNullOrWhiteSpace(data.StudentNo)) continue;
var exist = await dbContext.JumpRopeData
.FirstOrDefaultAsync(x => x.StudentNo == data.StudentNo && x.ScoreTime.Date == today);
if (exist != null)
{
exist.JumpValue = data.JumpValue;
exist.ErrorNumber = data.ErrorNumber;
exist.QuantityOfElectricity = data.QuantityOfElectricity;
exist.ScoreTime = data.ScoreTime;
}
else
{
dbContext.JumpRopeData.Add(data);
}
}
await dbContext.SaveChangesAsync();
_lastJumpRopeSaveTime = DateTime.Now;
}
catch (Exception ex)
{
_log.LogError(ex, "SyncTodayJumpDataToDbAsync数据同步错误。");
}
}
}
}