You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
haoliang-net/src/CncCollector/Core/CollectRecordWriter.cs

184 lines
8.6 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System;
using System.Collections.Generic;
using System.Text;
using Dapper;
using MySqlConnector;
using CncModels.Entity;
using log4net;
namespace CncCollector.Core
{
/// <summary>
/// 采集数据批量写入器。
/// 负责写入采集结构化记录、原始JSON日志更新机床和地址实时状态。
/// </summary>
public class CollectRecordWriter
{
private static readonly ILog _log = LogManager.GetLogger(typeof(CollectRecordWriter));
/// <summary>
/// 批量写入采集记录并更新相关状态
/// </summary>
/// <param name="businessConnStr">业务库连接字符串</param>
/// <param name="logConnStr">日志库连接字符串</param>
/// <param name="records">采集记录列表</param>
/// <param name="rawJson">原始JSON字符串</param>
/// <param name="collectAddressId">采集地址ID</param>
/// <param name="requestTime">请求开始时间</param>
/// <param name="responseDurationMs">响应耗时(毫秒)</param>
/// <param name="isSuccess">是否采集成功</param>
/// <param name="errorMessage">错误信息(失败时)</param>
public static long WriteBatch(string businessConnStr, string logConnStr,
List<CollectRecord> records, string rawJson, int collectAddressId,
DateTime requestTime, long? responseDurationMs, bool isSuccess, string errorMessage, int? statusCode = null)
{
var now = DateTime.Now;
long lastRawLogId = 0;
// 1. 写入原始JSON到日志库
try
{
using (var conn = new MySqlConnection(logConnStr))
{
conn.Open();
conn.Execute(@"INSERT INTO log_collect_raw (collect_address_id, request_time, response_time, response_duration, is_success, status_code, raw_json, error_message, created_at)
VALUES (@CollectAddressId, @RequestTime, @ResponseTime, @ResponseDuration, @IsSuccess, @StatusCode, @RawJson, @ErrorMessage, @CreatedAt)",
new
{
CollectAddressId = collectAddressId,
RequestTime = requestTime,
ResponseTime = now,
ResponseDuration = responseDurationMs,
IsSuccess = isSuccess ? 1 : 0,
StatusCode = statusCode ?? (isSuccess ? (int?)200 : null),
RawJson = rawJson ?? "",
ErrorMessage = errorMessage ?? (string)null,
CreatedAt = now
});
// 记录刚插入的 raw_log 的自增ID
try
{
lastRawLogId = conn.ExecuteScalar<long>("SELECT LAST_INSERT_ID();");
}
catch { lastRawLogId = 0; }
}
}
catch (Exception ex)
{
// 数据库不可用时详细错误信息通过log4net写入本地日志文件确保问题可追溯
_log.Error($"写入原始JSON日志失败地址ID={collectAddressId}, 成功={isSuccess}: {errorMessage}", ex);
}
if (!isSuccess || records == null || records.Count == 0) return lastRawLogId;
// 2. 批量写入采集结构化记录到业务库
try
{
using (var conn = new MySqlConnection(businessConnStr))
{
conn.Open();
using (var tran = conn.BeginTransaction())
{
try
{
foreach (var r in records)
{
conn.Execute(@"INSERT INTO cnc_collect_record (machine_id, collect_time, device_time, program_name, part_count,
device_status, run_status, operate_mode,
power_on_time, run_time, extra_data, created_at)
VALUES (@MachineId, @CollectTime, @DeviceTime, @ProgramName, @PartCount,
@DeviceStatus, @RunStatus, @OperateMode,
@PowerOnTime, @RunTime, @ExtraData, @CreatedAt)",
new
{
r.MachineId,
CollectTime = r.CollectTime,
DeviceTime = r.DeviceTime,
ProgramName = r.ProgramName ?? (string)null,
PartCount = r.PartCount,
DeviceStatus = r.DeviceStatus ?? (string)null,
RunStatus = r.RunStatus ?? (string)null,
OperateMode = r.OperateMode ?? (string)null,
PowerOnTime = r.PowerOnTime,
RunTime = r.RunTime,
ExtraData = r.ExtraData ?? (string)null,
CreatedAt = now
}, tran);
}
tran.Commit();
}
catch
{
tran.Rollback();
throw;
}
}
// 3. 更新每台机床的实时状态字段
foreach (var r in records)
{
try
{
conn.Execute(@"UPDATE cnc_machine SET last_collect_time = @CollectTime,
last_device_status = @DeviceStatus, last_run_status = @RunStatus,
last_program_name = @ProgramName, last_part_count = @PartCount,
last_operate_mode = @OperateMode
WHERE id = @MachineId",
new
{
r.MachineId,
r.CollectTime,
DeviceStatus = r.DeviceStatus ?? (string)null,
RunStatus = r.RunStatus ?? (string)null,
ProgramName = r.ProgramName ?? (string)null,
r.PartCount,
OperateMode = r.OperateMode ?? (string)null
});
}
catch (Exception ex)
{
_log.Error($"更新机床实时状态失败machine_id={r.MachineId}", ex);
}
}
// 4. 更新采集地址状态
try
{
conn.Execute(@"UPDATE cnc_collect_address SET last_collect_time = @Time, last_collect_status = 'success', fail_count = 0, updated_at = NOW()
WHERE id = @Id",
new { Time = now, Id = collectAddressId });
}
catch (Exception ex)
{
_log.Error($"更新采集地址状态失败address_id={collectAddressId}", ex);
}
}
}
catch (Exception ex)
{
_log.Error($"批量写入采集记录失败地址ID={collectAddressId}", ex);
}
return lastRawLogId;
}
/// <summary>
/// 记录采集失败:更新采集地址的失败计数和状态
/// </summary>
public static void RecordFailure(string businessConnStr, int collectAddressId, string errorMsg)
{
try
{
using (var conn = new MySqlConnection(businessConnStr))
{
conn.Execute(@"UPDATE cnc_collect_address SET last_collect_status = 'fail', fail_count = fail_count + 1, updated_at = NOW() WHERE id = @Id",
new { Id = collectAddressId });
}
}
catch (Exception ex)
{
_log.Error($"记录采集失败状态时出错address_id={collectAddressId}", ex);
}
}
}
}