新增CncCollector采集服务(配置加载+JSON解析+字段映射+HTTP采集+产量跟踪+日终汇总+心跳+管理API)

main
haoliang 6 days ago
parent 9890daf9aa
commit 6fd1d616ac

@ -1,4 +1,4 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17.VisualStudioVersion = 17.0.31903.59
MinimumVisualStudioVersion = 10.0.40219.1
@ -25,6 +25,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "frontend", "frontend", "{D0
frontend\tsconfig.json = frontend\tsconfig.json
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{331E2BF6-5FA9-42A8-9170-3B8FEE1E8C2B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CncCollector", "src\CncCollector\CncCollector.csproj", "{66697FD0-07FB-4237-B119-B645470CEB04}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -63,10 +67,15 @@ Global
{4DAE1DA8-E028-4025-AA80-1E698976F74C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4DAE1DA8-E028-4025-AA80-1E698976F74C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4DAE1DA8-E028-4025-AA80-1E698976F74C}.Release|Any CPU.Build.0 = Release|Any CPU
{66697FD0-07FB-4237-B119-B645470CEB04}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{66697FD0-07FB-4237-B119-B645470CEB04}.Debug|Any CPU.Build.0 = Debug|Any CPU
{66697FD0-07FB-4237-B119-B645470CEB04}.Release|Any CPU.ActiveCfg = Release|Any CPU
{66697FD0-07FB-4237-B119-B645470CEB04}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{66697FD0-07FB-4237-B119-B645470CEB04} = {331E2BF6-5FA9-42A8-9170-3B8FEE1E8C2B}
EndGlobalSection
EndGlobal

@ -0,0 +1,163 @@
using System;
using System.Net;
using System.Text;
using System.Threading;
using Newtonsoft.Json;
using CncCollector.Core;
using log4net;
namespace CncCollector.Api
{
/// <summary>
/// 采集服务管理API HTTP服务。
/// 使用 HttpListener 提供轻量管理接口,支持启停控制和状态查询。
/// </summary>
public class CollectorApiServer
{
private static readonly ILog _log = LogManager.GetLogger(typeof(CollectorApiServer));
private readonly CollectorEngine _engine;
private readonly string _apiKey;
private readonly int _port;
private HttpListener _listener;
private Thread _listenerThread;
private volatile bool _running;
/// <summary>
/// 初始化管理API服务
/// </summary>
/// <param name="engine">采集引擎实例</param>
/// <param name="apiKey">API Key用于认证</param>
/// <param name="port">监听端口</param>
public CollectorApiServer(CollectorEngine engine, string apiKey, int port)
{
_engine = engine;
_apiKey = apiKey;
_port = port;
}
/// <summary>
/// 启动HTTP监听
/// </summary>
public void Start()
{
if (_running) return;
try
{
_listener = new HttpListener();
_listener.Prefixes.Add($"http://+:{_port}/api/collector/");
_listener.Start();
_running = true;
_listenerThread = new Thread(() =>
{
while (_running)
{
try
{
var context = _listener.GetContext();
ThreadPool.QueueUserWorkItem(_ => ProcessRequest(context));
}
catch (HttpListenerException) { break; }
catch (ObjectDisposedException) { break; }
}
}) { IsBackground = true };
_listenerThread.Start();
_log.Info($"管理API已启动: http://localhost:{_port}/api/collector/");
}
catch (Exception ex)
{
_log.Error($"管理API启动失败端口={_port}", ex);
}
}
/// <summary>
/// 停止HTTP监听
/// </summary>
public void Stop()
{
_running = false;
try { _listener?.Stop(); } catch { }
_log.Info("管理API已停止");
}
/// <summary>
/// 处理单个HTTP请求
/// </summary>
private void ProcessRequest(HttpListenerContext ctx)
{
string path = ctx.Request.Url.AbsolutePath.TrimEnd('/').ToLower();
string method = ctx.Request.HttpMethod;
try
{
// API Key 认证
string apiKeyHeader = ctx.Request.Headers["X-Api-Key"];
if (string.IsNullOrEmpty(apiKeyHeader) || apiKeyHeader != _apiKey)
{
SendJson(ctx, 401, new { code = 40101, message = "API Key无效", data = (object)null });
return;
}
// 路由
if (path == "/api/collector/status" && method == "GET")
{
var status = _engine.GetStatus();
SendSuccess(ctx, status);
}
else if (path == "/api/collector/start" && method == "POST")
{
_engine.Start();
SendSuccess(ctx, new { message = "采集服务已启动" });
}
else if (path == "/api/collector/stop" && method == "POST")
{
_engine.Stop();
SendSuccess(ctx, new { message = "采集服务已停止" });
}
else if (path == "/api/collector/refresh" && method == "POST")
{
_engine.Refresh();
SendSuccess(ctx, new { message = "配置已刷新" });
}
else
{
SendJson(ctx, 404, new { code = 40400, message = "未知端点", data = (object)null });
}
}
catch (Exception ex)
{
_log.Error("处理API请求异常", ex);
SendJson(ctx, 500, new { code = 50001, message = ex.Message, data = (object)null });
}
}
/// <summary>
/// 发送成功响应
/// </summary>
private void SendSuccess(HttpListenerContext ctx, object data)
{
SendJson(ctx, 200, new { code = 0, message = "success", data });
}
/// <summary>
/// 发送JSON响应
/// </summary>
private void SendJson(HttpListenerContext ctx, int statusCode, object obj)
{
try
{
string json = JsonConvert.SerializeObject(obj);
byte[] bytes = Encoding.UTF8.GetBytes(json);
ctx.Response.StatusCode = statusCode;
ctx.Response.ContentType = "application/json; charset=utf-8";
ctx.Response.ContentLength64 = bytes.Length;
ctx.Response.OutputStream.Write(bytes, 0, bytes.Length);
ctx.Response.OutputStream.Close();
}
catch { }
}
}
}

@ -0,0 +1,51 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net472</TargetFramework>
<PlatformTarget>x64</PlatformTarget>
<RootNamespace>CncCollector</RootNamespace>
<AssemblyName>CncCollector</AssemblyName>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<GenerateAssemblyInfo>false</GenerateAssemblyInfo>
<!-- Windows Service 控制台应用,可交互安装 -->
<OutputType>Exe</OutputType>
<OutputPath>bin\</OutputPath>
<AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath>
<AppendRuntimeIdentifierToOutputPath>false</AppendRuntimeIdentifierToOutputPath>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies.net472" Version="1.0.3" PrivateAssets="all" />
<!-- JSON序列化 -->
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<!-- ORM -->
<PackageReference Include="Dapper" Version="2.1.35" />
<!-- MariaDB驱动 -->
<PackageReference Include="MySqlConnector" Version="2.3.7" />
<!-- 日志 -->
<PackageReference Include="log4net" Version="2.0.15" />
</ItemGroup>
<!-- Framework 引用 -->
<ItemGroup>
<Reference Include="System.Net.Http" />
<Reference Include="System.IO" />
<Reference Include="System.Runtime" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\CncModels\CncModels.csproj" />
<ProjectReference Include="..\CncRepository\CncRepository.csproj" />
</ItemGroup>
<!-- 将配置文件复制到输出目录 -->
<ItemGroup>
<None Update="collector.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="log4net.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

@ -0,0 +1,79 @@
using System;
using System.IO;
using Newtonsoft.Json;
namespace CncCollector.Config
{
/// <summary>
/// 采集服务配置(从 collector.json 读取)
/// </summary>
public class CollectorConfig
{
/// <summary>业务库连接字符串</summary>
[JsonProperty("businessConnection")]
public string BusinessConnection { get; set; }
/// <summary>日志库连接字符串</summary>
[JsonProperty("logConnection")]
public string LogConnection { get; set; }
/// <summary>管理API端口</summary>
[JsonProperty("apiPort")]
public int ApiPort { get; set; } = 5800;
/// <summary>服务间通信API Key</summary>
[JsonProperty("apiKey")]
public string ApiKey { get; set; } = "collector_api_key_2026";
/// <summary>心跳上报间隔(秒)</summary>
[JsonProperty("heartbeatIntervalSeconds")]
public int HeartbeatIntervalSeconds { get; set; } = 10;
/// <summary>配置轮询间隔(秒)</summary>
[JsonProperty("configPollIntervalSeconds")]
public int ConfigPollIntervalSeconds { get; set; } = 30;
/// <summary>日终汇总执行时间HH:mm格式</summary>
[JsonProperty("dailySummaryTime")]
public string DailySummaryTime { get; set; } = "01:00";
/// <summary>服务ID标识</summary>
[JsonProperty("serviceId")]
public string ServiceId { get; set; } = "CncCollector";
// ===== 以下为从DB加载的运行时配置 =====
/// <summary>采集失败重试次数默认3</summary>
[JsonIgnore]
public int CollectRetryCount { get; set; } = 3;
/// <summary>采集重试间隔秒数默认30</summary>
[JsonIgnore]
public int CollectRetryIntervalSeconds { get; set; } = 30;
/// <summary>连续失败N次触发告警默认5</summary>
[JsonIgnore]
public int CollectFailAlertThreshold { get; set; } = 5;
/// <summary>
/// 从文件加载配置
/// </summary>
public static CollectorConfig Load()
{
// 优先从运行目录读取
string configPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "collector.json");
if (!File.Exists(configPath))
{
// 开发模式:从项目根目录读取
configPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "..", "..", "..", "collector.json");
}
if (!File.Exists(configPath))
{
throw new FileNotFoundException("找不到配置文件 collector.json");
}
string json = File.ReadAllText(configPath);
return JsonConvert.DeserializeObject<CollectorConfig>(json);
}
}
}

@ -0,0 +1,77 @@
using System;
using System.Collections.Generic;
using Dapper;
using MySqlConnector;
using log4net;
namespace CncCollector.Config
{
/// <summary>
/// 从 cnc_sys_config 表加载运行时配置,覆盖 CollectorConfig 的默认值
/// </summary>
public static class ConfigLoader
{
private static readonly ILog _log = LogManager.GetLogger(typeof(ConfigLoader));
/// <summary>
/// 从数据库加载配置并覆盖 CollectorConfig 中的默认值
/// </summary>
/// <param name="connectionString">业务库连接字符串</param>
/// <param name="config">要覆盖的配置对象</param>
public static void LoadRuntimeConfig(string connectionString, CollectorConfig config)
{
try
{
using (var conn = new MySqlConnection(connectionString))
{
conn.Open();
var rows = conn.Query<KeyValuePair<string, string>>(
"SELECT config_key as `Key`, config_value as `Value` FROM cnc_sys_config");
foreach (var row in rows)
{
string key = row.Key;
string val = row.Value ?? "";
switch (key)
{
case "collector_api_port":
if (int.TryParse(val, out var port)) config.ApiPort = port;
break;
case "collector_api_key":
if (!string.IsNullOrEmpty(val)) config.ApiKey = val;
break;
case "heartbeat_interval":
if (int.TryParse(val, out var hb)) config.HeartbeatIntervalSeconds = hb;
break;
case "config_poll_interval":
if (int.TryParse(val, out var cp)) config.ConfigPollIntervalSeconds = cp;
break;
case "daily_summary_time":
if (!string.IsNullOrEmpty(val)) config.DailySummaryTime = val;
break;
case "collect_retry_count":
if (int.TryParse(val, out var rc)) config.CollectRetryCount = rc;
break;
case "collect_retry_interval":
if (int.TryParse(val, out var ri)) config.CollectRetryIntervalSeconds = ri;
break;
case "collect_fail_alert_threshold":
if (int.TryParse(val, out var ft)) config.CollectFailAlertThreshold = ft;
break;
}
}
}
_log.Info($"数据库配置加载完成: ApiPort={config.ApiPort}, Heartbeat={config.HeartbeatIntervalSeconds}s, "
+ $"ConfigPoll={config.ConfigPollIntervalSeconds}s, SummaryTime={config.DailySummaryTime}, "
+ $"RetryCount={config.CollectRetryCount}, RetryInterval={config.CollectRetryIntervalSeconds}s, "
+ $"FailThreshold={config.CollectFailAlertThreshold}");
}
catch (Exception ex)
{
_log.Error("从数据库加载配置失败,将使用默认值", ex);
}
}
}
}

@ -0,0 +1,186 @@
using System;
using System.Collections.Generic;
using System.Text;
using Dapper;
using MySqlConnector;
using CncModels.Entity;
using log4net;
namespace CncCollector.Core
{
/// <summary>
/// 采集数据批量写入器。
/// 负责写入采集结构化记录、原始JSON日志更新机床和地址实时状态。
/// </summary>
public class CollectRecordWriter
{
private static readonly ILog _log = LogManager.GetLogger(typeof(CollectRecordWriter));
/// <summary>
/// 批量写入采集记录并更新相关状态
/// </summary>
/// <param name="businessConnStr">业务库连接字符串</param>
/// <param name="logConnStr">日志库连接字符串</param>
/// <param name="records">采集记录列表</param>
/// <param name="rawJson">原始JSON字符串</param>
/// <param name="collectAddressId">采集地址ID</param>
/// <param name="requestTime">请求开始时间</param>
/// <param name="responseDurationMs">响应耗时(毫秒)</param>
/// <param name="isSuccess">是否采集成功</param>
/// <param name="errorMessage">错误信息(失败时)</param>
public static void WriteBatch(string businessConnStr, string logConnStr,
List<CollectRecord> records, string rawJson, int collectAddressId,
DateTime requestTime, long? responseDurationMs, bool isSuccess, string errorMessage)
{
var now = DateTime.Now;
// 1. 写入原始JSON到日志库
try
{
using (var conn = new MySqlConnection(logConnStr))
{
conn.Open();
conn.Execute(@"INSERT INTO log_collect_raw (collect_address_id, request_time, response_time, response_duration, is_success, status_code, raw_json, error_message, created_at)
VALUES (@CollectAddressId, @RequestTime, @ResponseTime, @ResponseDuration, @IsSuccess, @StatusCode, @RawJson, @ErrorMessage, @CreatedAt)",
new
{
CollectAddressId = collectAddressId,
RequestTime = requestTime,
ResponseTime = now,
ResponseDuration = responseDurationMs,
IsSuccess = isSuccess ? 1 : 0,
StatusCode = isSuccess ? (int?)200 : null,
RawJson = rawJson ?? "",
ErrorMessage = errorMessage ?? (string)null,
CreatedAt = now
});
}
}
catch (Exception ex)
{
_log.Error($"写入原始JSON日志失败地址ID={collectAddressId}", ex);
}
if (!isSuccess || records == null || records.Count == 0) return;
// 2. 批量写入采集结构化记录到业务库
try
{
using (var conn = new MySqlConnection(businessConnStr))
{
conn.Open();
using (var tran = conn.BeginTransaction())
{
try
{
foreach (var r in records)
{
conn.Execute(@"INSERT INTO cnc_collect_record (machine_id, collect_time, device_time, program_name, part_count,
device_status, run_status, operate_mode, spindle_speed_set, feed_speed_set,
spindle_speed_actual, feed_speed_actual, spindle_load, spindle_override,
power_on_time, run_time, cutting_time, cycle_time, machining_status, extra_data, created_at)
VALUES (@MachineId, @CollectTime, @DeviceTime, @ProgramName, @PartCount,
@DeviceStatus, @RunStatus, @OperateMode, @SpindleSpeedSet, @FeedSpeedSet,
@SpindleSpeedActual, @FeedSpeedActual, @SpindleLoad, @SpindleOverride,
@PowerOnTime, @RunTime, @CuttingTime, @CycleTime, @MachiningStatus, @ExtraData, @CreatedAt)",
new
{
r.MachineId,
CollectTime = r.CollectTime,
DeviceTime = r.DeviceTime,
ProgramName = r.ProgramName ?? (string)null,
PartCount = r.PartCount,
DeviceStatus = r.DeviceStatus ?? (string)null,
RunStatus = r.RunStatus ?? (string)null,
OperateMode = r.OperateMode ?? (string)null,
SpindleSpeedSet = r.SpindleSpeedSet,
FeedSpeedSet = r.FeedSpeedSet,
SpindleSpeedActual = r.SpindleSpeedActual,
FeedSpeedActual = r.FeedSpeedActual,
SpindleLoad = r.SpindleLoad,
SpindleOverride = r.SpindleOverride,
PowerOnTime = r.PowerOnTime,
RunTime = r.RunTime,
CuttingTime = r.CuttingTime,
CycleTime = r.CycleTime,
MachiningStatus = r.MachiningStatus ?? (string)null,
ExtraData = r.ExtraData ?? (string)null,
CreatedAt = now
}, tran);
}
tran.Commit();
}
catch
{
tran.Rollback();
throw;
}
}
// 3. 更新每台机床的实时状态字段
foreach (var r in records)
{
try
{
conn.Execute(@"UPDATE cnc_machine SET last_collect_time = @CollectTime,
last_device_status = @DeviceStatus, last_run_status = @RunStatus,
last_program_name = @ProgramName, last_part_count = @PartCount,
last_operate_mode = @OperateMode, last_machining_status = @MachiningStatus
WHERE id = @MachineId",
new
{
r.MachineId,
r.CollectTime,
DeviceStatus = r.DeviceStatus ?? (string)null,
RunStatus = r.RunStatus ?? (string)null,
ProgramName = r.ProgramName ?? (string)null,
r.PartCount,
OperateMode = r.OperateMode ?? (string)null,
MachiningStatus = r.MachiningStatus ?? (string)null
});
}
catch (Exception ex)
{
_log.Error($"更新机床实时状态失败machine_id={r.MachineId}", ex);
}
}
// 4. 更新采集地址状态
try
{
conn.Execute(@"UPDATE cnc_collect_address SET last_collect_time = @Time, last_collect_status = 'success', fail_count = 0, updated_at = NOW()
WHERE id = @Id",
new { Time = now, Id = collectAddressId });
}
catch (Exception ex)
{
_log.Error($"更新采集地址状态失败address_id={collectAddressId}", ex);
}
}
}
catch (Exception ex)
{
_log.Error($"批量写入采集记录失败地址ID={collectAddressId}", ex);
}
}
/// <summary>
/// 记录采集失败:更新采集地址的失败计数和状态
/// </summary>
public static void RecordFailure(string businessConnStr, int collectAddressId, string errorMsg)
{
try
{
using (var conn = new MySqlConnection(businessConnStr))
{
conn.Execute(@"UPDATE cnc_collect_address SET last_collect_status = 'fail', fail_count = fail_count + 1, updated_at = NOW() WHERE id = @Id",
new { Id = collectAddressId });
}
}
catch (Exception ex)
{
_log.Error($"记录采集失败状态时出错address_id={collectAddressId}", ex);
}
}
}
}

@ -0,0 +1,427 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Http;
using System.Net.NetworkInformation;
using System.Threading;
using Dapper;
using MySqlConnector;
using Newtonsoft.Json.Linq;
using CncModels.Entity;
using CncModels.Enum;
using CncCollector.Config;
using log4net;
namespace CncCollector.Core
{
/// <summary>
/// 单个采集地址的工作线程。
/// 循环执行Ping检测 → HTTP采集 → JSON解析 → 数据入库 → 产量跟踪。
/// </summary>
public class CollectWorker
{
private static readonly ILog _log = LogManager.GetLogger(typeof(CollectWorker));
private readonly CollectAddress _address;
private readonly CollectorConfig _config;
private readonly ProductionTracker _tracker;
private readonly string _businessConnStr;
private readonly string _logConnStr;
private Thread _thread;
private volatile bool _running;
private int _consecutiveFailCount;
/// <summary>采集地址ID</summary>
public int AddressId => _address.Id;
/// <summary>采集地址名称</summary>
public string AddressName => _address.Name;
/// <summary>是否运行中</summary>
public bool IsRunning => _running;
/// <summary>最后采集时间</summary>
public DateTime? LastCollectTime { get; private set; }
/// <summary>成功次数</summary>
public long SuccessCount { get; private set; }
/// <summary>失败次数</summary>
public long FailCount { get; private set; }
/// <summary>
/// 初始化采集工作线程
/// </summary>
/// <param name="address">采集地址配置</param>
/// <param name="config">全局配置</param>
/// <param name="tracker">产量跟踪器</param>
/// <param name="businessConnStr">业务库连接字符串</param>
/// <param name="logConnStr">日志库连接字符串</param>
public CollectWorker(CollectAddress address, CollectorConfig config, ProductionTracker tracker,
string businessConnStr, string logConnStr)
{
_address = address;
_config = config;
_tracker = tracker;
_businessConnStr = businessConnStr;
_logConnStr = logConnStr;
}
/// <summary>
/// 启动工作线程
/// </summary>
public void Start()
{
if (_running) return;
_running = true;
_thread = new Thread(Run) { IsBackground = true, Name = $"Collector-{_address.Id}" };
_thread.Start();
_log.Info($"采集工作线程已启动: {_address.Name}(间隔={_address.CollectInterval}秒)");
}
/// <summary>
/// 停止工作线程
/// </summary>
public void Stop()
{
_running = false;
_log.Info($"采集工作线程已停止: {_address.Name}");
}
/// <summary>
/// 工作线程主循环
/// </summary>
private void Run()
{
while (_running)
{
try
{
DoCollectCycle();
}
catch (Exception ex)
{
_log.Error($"采集循环异常(地址={_address.Name}", ex);
}
// 等待下一次采集
int intervalMs = _address.CollectInterval * 1000;
for (int i = 0; i < intervalMs / 100 && _running; i++)
{
Thread.Sleep(100);
}
}
}
/// <summary>
/// 执行一次完整的采集周期Ping → HTTP采集 → 解析 → 入库
/// </summary>
private void DoCollectCycle()
{
var requestTime = DateTime.Now;
// 1. Ping 检测
if (!PingAddress())
{
// Ping 失败,更新机床离线状态
UpdateMachineOnlineStatus(false);
return;
}
// Ping 成功,更新机床在线状态
UpdateMachineOnlineStatus(true);
// 2. HTTP 采集(含重试)
string rawJson = null;
bool success = false;
string errorMsg = null;
long durationMs = 0;
int retryCount = _config.CollectRetryCount;
for (int attempt = 0; attempt <= retryCount; attempt++)
{
if (attempt > 0)
{
_log.Warn($"采集重试第{attempt}次(地址={_address.Name}");
Thread.Sleep(_config.CollectRetryIntervalSeconds * 1000);
}
var sw = Stopwatch.StartNew();
try
{
using (var client = new HttpClient { Timeout = TimeSpan.FromSeconds(30) })
{
var response = client.GetAsync(_address.Url).Result;
sw.Stop();
durationMs = sw.ElapsedMilliseconds;
if (response.IsSuccessStatusCode)
{
rawJson = response.Content.ReadAsStringAsync().Result;
success = true;
break;
}
else
{
errorMsg = $"HTTP {(int)response.StatusCode}: {response.ReasonPhrase}";
}
}
}
catch (Exception ex)
{
sw.Stop();
durationMs = sw.ElapsedMilliseconds;
errorMsg = ex.Message;
}
}
if (!success)
{
_consecutiveFailCount++;
FailCount++;
_log.Warn($"采集失败(地址={_address.Name}, 连续失败={_consecutiveFailCount}: {errorMsg}");
// 写入失败记录
CollectRecordWriter.WriteBatch(_businessConnStr, _logConnStr, null, rawJson,
_address.Id, requestTime, durationMs, false, errorMsg);
CollectRecordWriter.RecordFailure(_businessConnStr, _address.Id, errorMsg);
// 连续失败超过阈值,触发告警
if (_consecutiveFailCount >= _config.CollectFailAlertThreshold)
{
CreateAlert(AlertType.CollectFail, null, _address.Id,
$"采集地址「{_address.Name}」连续{_consecutiveFailCount}次采集失败",
errorMsg);
}
return;
}
// 3. 解析 JSON
_consecutiveFailCount = 0;
SuccessCount++;
LastCollectTime = DateTime.Now;
try
{
ParseAndSave(rawJson, requestTime, durationMs);
}
catch (Exception ex)
{
_log.Error($"JSON解析/入库失败(地址={_address.Name}", ex);
}
}
/// <summary>
/// Ping 检测采集地址可达性
/// </summary>
private bool PingAddress()
{
try
{
// 从 URL 提取主机名
var uri = new Uri(_address.Url);
string host = uri.Host;
using (var ping = new Ping())
{
var reply = ping.Send(host, 5000);
return reply.Status == IPStatus.Success;
}
}
catch (Exception ex)
{
_log.Debug($"Ping失败地址={_address.Name}: {ex.Message}");
return false;
}
}
/// <summary>
/// 更新此地址下所有机床的在线状态
/// </summary>
private void UpdateMachineOnlineStatus(bool isOnline)
{
try
{
using (var conn = new MySqlConnection(_businessConnStr))
{
conn.Execute(@"UPDATE cnc_machine SET is_online = @Online, last_ping_time = NOW(), updated_at = NOW()
WHERE collect_address_id = @AddrId AND is_enabled = 1",
new { Online = isOnline ? 1 : 0, AddrId = _address.Id });
}
}
catch (Exception ex)
{
_log.Error($"更新机床在线状态失败(地址={_address.Name}", ex);
}
}
/// <summary>
/// 解析采集到的 JSON 数据并写入数据库
/// </summary>
private void ParseAndSave(string rawJson, DateTime requestTime, long durationMs)
{
var collectTime = DateTime.Now;
// 1. 加载品牌配置和字段映射
Brand brand = null;
List<BrandFieldMapping> mappings = null;
List<Machine> machines = null;
using (var conn = new MySqlConnection(_businessConnStr))
{
brand = conn.QueryFirstOrDefault<Brand>(
"SELECT id as Id, brand_name as BrandName, device_field as DeviceField, tags_path as TagsPath, is_enabled as IsEnabled, created_at as CreatedAt, updated_at as UpdatedAt FROM cnc_brand WHERE id = @BrandId",
new { BrandId = _address.BrandId });
if (brand != null)
{
mappings = conn.Query<BrandFieldMapping>(
"SELECT id as Id, brand_id as BrandId, standard_field as StandardField, field_name as FieldName, match_by as MatchBy, data_type as DataType, is_required as IsRequired, created_at as CreatedAt FROM cnc_brand_field_mapping WHERE brand_id = @BrandId",
new { BrandId = brand.Id }).AsList();
}
// 加载此地址下的机床列表
machines = conn.Query<Machine>(
"SELECT id as Id, device_code as DeviceCode, name as Name, workshop_id as WorkshopId, collect_address_id as CollectAddressId, ip_address as IpAddress, brand_id as BrandId, is_enabled as IsEnabled, is_online as IsOnline, last_ping_time as LastPingTime, last_collect_time as LastCollectTime, last_device_status as LastDeviceStatus, last_run_status as LastRunStatus, last_program_name as LastProgramName, last_part_count as LastPartCount, last_operate_mode as LastOperateMode, last_machining_status as LastMachiningStatus, created_at as CreatedAt, updated_at as UpdatedAt FROM cnc_machine WHERE collect_address_id = @AddrId AND is_enabled = 1",
new { AddrId = _address.Id }).AsList();
}
if (brand == null || mappings == null || mappings.Count == 0)
{
_log.Warn($"品牌配置或字段映射缺失(地址={_address.Name}, brandId={_address.BrandId}");
return;
}
// 2. 构建 device_code → machine 的查找字典
var machineDict = new Dictionary<string, Machine>(StringComparer.OrdinalIgnoreCase);
if (machines != null)
{
foreach (var m in machines)
{
if (!string.IsNullOrEmpty(m.DeviceCode))
machineDict[m.DeviceCode] = m;
}
}
// 3. 解析 JSON 数组
JArray devices;
try
{
devices = JArray.Parse(rawJson);
}
catch (Exception ex)
{
_log.Error($"JSON解析失败地址={_address.Name}", ex);
return;
}
var records = new List<CollectRecord>();
foreach (var deviceToken in devices)
{
var deviceObj = deviceToken as JObject;
if (deviceObj == null) continue;
// 提取 device 字段值
string deviceCode = DataParser.ExtractDeviceCode(deviceObj, brand.DeviceField ?? "device");
if (string.IsNullOrEmpty(deviceCode))
{
_log.Warn("发现无device字段的设备数据跳过");
continue;
}
// 匹配机床
Machine machine;
if (!machineDict.TryGetValue(deviceCode, out machine))
{
// 未知设备 → 记录告警
_log.Warn($"未知设备: device={deviceCode}(地址={_address.Name}");
CreateAlert(AlertType.UnknownDevice, null, _address.Id,
$"发现未知设备: {deviceCode}",
$"采集地址「{_address.Name}」返回了未在机床表中配置的设备编码: {deviceCode}");
continue;
}
// 解析字段
var parsed = DataParser.ParseDevice(deviceObj, brand, mappings);
// 构建 CollectRecord
var record = new CollectRecord
{
MachineId = machine.Id,
CollectTime = collectTime,
ProgramName = GetStringValue(parsed, "program_name"),
PartCount = GetDecimalValue(parsed, "part_count"),
DeviceStatus = GetStringValue(parsed, "device_status"),
RunStatus = GetStringValue(parsed, "run_status"),
OperateMode = GetStringValue(parsed, "operate_mode"),
SpindleSpeedSet = GetDecimalValue(parsed, "spindle_speed_set"),
FeedSpeedSet = GetDecimalValue(parsed, "feed_speed_set"),
SpindleSpeedActual = GetDecimalValue(parsed, "spindle_speed_actual"),
FeedSpeedActual = GetDecimalValue(parsed, "feed_speed_actual"),
SpindleLoad = GetDecimalValue(parsed, "spindle_load"),
SpindleOverride = GetDecimalValue(parsed, "spindle_override"),
PowerOnTime = GetDecimalValue(parsed, "power_on_time"),
RunTime = GetDecimalValue(parsed, "run_time"),
CuttingTime = GetDecimalValue(parsed, "cutting_time"),
CycleTime = GetDecimalValue(parsed, "cycle_time"),
MachiningStatus = GetStringValue(parsed, "machining_status")
};
records.Add(record);
// 产量跟踪
_tracker.Track(machine.Id, record.ProgramName, record.PartCount, collectTime);
}
// 4. 批量写入
CollectRecordWriter.WriteBatch(_businessConnStr, _logConnStr, records, rawJson,
_address.Id, requestTime, durationMs, true, null);
_log.Info($"采集完成: {_address.Name} → {records.Count}台设备, {durationMs}ms");
}
/// <summary>
/// 从解析结果中获取字符串值
/// </summary>
private string GetStringValue(Dictionary<string, DataParser.ParsedField> parsed, string field)
{
DataParser.ParsedField pf;
if (parsed.TryGetValue(field, out pf))
return pf.StringValue;
return null;
}
/// <summary>
/// 从解析结果中获取数值
/// </summary>
private decimal? GetDecimalValue(Dictionary<string, DataParser.ParsedField> parsed, string field)
{
DataParser.ParsedField pf;
if (parsed.TryGetValue(field, out pf))
return pf.NumericValue;
return null;
}
/// <summary>
/// 创建告警记录
/// </summary>
private void CreateAlert(string alertType, int? machineId, int? addressId, string title, string detail)
{
try
{
using (var conn = new MySqlConnection(_businessConnStr))
{
conn.Execute(@"INSERT INTO cnc_alert (alert_type, machine_id, collect_address_id, title, detail, is_resolved, created_at)
VALUES (@AlertType, @MachineId, @AddressId, @Title, @Detail, 0, NOW())",
new { AlertType = alertType, MachineId = machineId, AddressId = addressId, Title = title, Detail = detail });
}
}
catch (Exception ex)
{
_log.Error("创建告警记录失败", ex);
}
}
}
}

@ -0,0 +1,318 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using Dapper;
using MySqlConnector;
using CncModels.Entity;
using CncCollector.Config;
using log4net;
namespace CncCollector.Core
{
/// <summary>
/// 采集引擎主控。
/// 负责加载采集地址、管理工作线程、心跳上报、配置轮询和日终汇总调度。
/// </summary>
public class CollectorEngine
{
private static readonly ILog _log = LogManager.GetLogger(typeof(CollectorEngine));
private readonly CollectorConfig _config;
private readonly ConcurrentDictionary<int, CollectWorker> _workers = new ConcurrentDictionary<int, CollectWorker>();
private readonly ProductionTracker _tracker;
private readonly DailySummaryJob _dailySummary;
private Timer _heartbeatTimer;
private Timer _configPollTimer;
private Timer _dailySummaryTimer;
private DateTime _startTime;
private long _totalSuccess;
private long _totalFail;
private volatile bool _isRunning;
private DateTime _lastSummaryDate = DateTime.MinValue;
/// <summary>是否运行中</summary>
public bool IsRunning => _isRunning;
/// <summary>启动时间</summary>
public DateTime StartTime => _startTime;
/// <summary>运行时长(秒)</summary>
public long UptimeSeconds => _isRunning ? (long)(DateTime.Now - _startTime).TotalSeconds : 0;
/// <summary>工作线程数量</summary>
public int WorkerCount => _workers.Count;
/// <summary>
/// 初始化采集引擎
/// </summary>
/// <param name="config">全局配置</param>
public CollectorEngine(CollectorConfig config)
{
_config = config;
_tracker = new ProductionTracker(config.BusinessConnection);
_dailySummary = new DailySummaryJob(config.BusinessConnection);
}
/// <summary>
/// 启动采集引擎:加载地址、启动工作线程、启动定时任务
/// </summary>
public void Start()
{
if (_isRunning) return;
_log.Info("===== 采集引擎启动 =====");
_startTime = DateTime.Now;
_isRunning = true;
// 1. 加载并启动采集地址
LoadAndStartWorkers();
// 2. 启动心跳上报定时器
_heartbeatTimer = new Timer(OnHeartbeat, null,
TimeSpan.FromSeconds(_config.HeartbeatIntervalSeconds),
TimeSpan.FromSeconds(_config.HeartbeatIntervalSeconds));
// 3. 启动配置轮询定时器
_configPollTimer = new Timer(OnConfigPoll, null,
TimeSpan.FromSeconds(_config.ConfigPollIntervalSeconds),
TimeSpan.FromSeconds(_config.ConfigPollIntervalSeconds));
// 4. 启动日终汇总检查定时器(每分钟检查一次)
_dailySummaryTimer = new Timer(OnDailySummaryCheck, null,
TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
_log.Info($"===== 采集引擎已启动({_workers.Count}个采集地址)=====");
}
/// <summary>
/// 停止采集引擎
/// </summary>
public void Stop()
{
if (!_isRunning) return;
_log.Info("===== 采集引擎停止中 =====");
_isRunning = false;
// 停止所有工作线程
foreach (var kvp in _workers)
{
kvp.Value.Stop();
}
_workers.Clear();
// 结账所有活跃段
_tracker.Dispose();
// 停止定时器
_heartbeatTimer?.Dispose();
_configPollTimer?.Dispose();
_dailySummaryTimer?.Dispose();
// 写入停止状态心跳
WriteHeartbeat("stopped");
_log.Info("===== 采集引擎已停止 =====");
}
/// <summary>
/// 刷新配置:重新加载采集地址,处理新增/删除/变更
/// </summary>
public void Refresh()
{
_log.Info("刷新采集配置...");
LoadAndStartWorkers();
_log.Info("采集配置刷新完成");
}
/// <summary>
/// 获取引擎状态摘要
/// </summary>
public Dictionary<string, object> GetStatus()
{
var status = new Dictionary<string, object>
{
["isRunning"] = _isRunning,
["startTime"] = _startTime.ToString("yyyy-MM-dd HH:mm:ss"),
["uptimeSeconds"] = UptimeSeconds,
["workerCount"] = _workers.Count,
["totalSuccess"] = _totalSuccess,
["totalFail"] = _totalFail
};
var workerList = new List<Dictionary<string, object>>();
foreach (var kvp in _workers)
{
workerList.Add(new Dictionary<string, object>
{
["addressId"] = kvp.Key,
["name"] = kvp.Value.AddressName,
["isRunning"] = kvp.Value.IsRunning
});
}
status["workers"] = workerList;
return status;
}
/// <summary>
/// 加载启用的采集地址并启动工作线程
/// </summary>
private void LoadAndStartWorkers()
{
try
{
List<CollectAddress> addresses;
using (var conn = new MySqlConnection(_config.BusinessConnection))
{
addresses = conn.Query<CollectAddress>(
"SELECT id as Id, name as Name, url as Url, brand_id as BrandId, collect_interval as CollectInterval, is_enabled as IsEnabled, last_collect_time as LastCollectTime, last_collect_status as LastCollectStatus, fail_count as FailCount, created_at as CreatedAt, updated_at as UpdatedAt FROM cnc_collect_address WHERE is_enabled = 1"
).AsList();
}
// 停止已删除的地址
foreach (var kvp in _workers)
{
bool exists = false;
foreach (var addr in addresses)
{
if (addr.Id == kvp.Key) { exists = true; break; }
}
if (!exists)
{
kvp.Value.Stop();
CollectWorker removed;
_workers.TryRemove(kvp.Key, out removed);
_log.Info($"已停止删除的采集地址: {kvp.Value.AddressName}");
}
}
// 启动新增的地址
foreach (var addr in addresses)
{
if (!_workers.ContainsKey(addr.Id))
{
var worker = new CollectWorker(addr, _config, _tracker,
_config.BusinessConnection, _config.LogConnection);
worker.Start();
_workers[addr.Id] = worker;
_log.Info($"已启动采集地址: {addr.Name}URL={addr.Url}, 间隔={addr.CollectInterval}秒)");
}
}
}
catch (Exception ex)
{
_log.Error("加载采集地址失败", ex);
}
}
/// <summary>
/// 心跳上报回调
/// </summary>
private void OnHeartbeat(object state)
{
try
{
// 统计成功/失败次数
long success = 0, fail = 0;
foreach (var kvp in _workers)
{
success += kvp.Value.SuccessCount;
fail += kvp.Value.FailCount;
}
_totalSuccess = success;
_totalFail = fail;
WriteHeartbeat("running");
}
catch (Exception ex)
{
_log.Error("心跳上报失败", ex);
}
}
/// <summary>
/// 写入心跳记录到 log_collector_heartbeat
/// </summary>
private void WriteHeartbeat(string status)
{
try
{
using (var conn = new MySqlConnection(_config.LogConnection))
{
conn.Execute(@"INSERT INTO log_collector_heartbeat (service_id, status, last_collect_time, success_count, fail_count, uptime_seconds, detail, created_at)
VALUES (@ServiceId, @Status, @LastCollectTime, @SuccessCount, @FailCount, @UptimeSeconds, @Detail, NOW())",
new
{
ServiceId = _config.ServiceId,
Status = status,
LastCollectTime = DateTime.Now,
SuccessCount = _totalSuccess,
FailCount = _totalFail,
UptimeSeconds = UptimeSeconds,
Detail = (string)null
});
}
}
catch (Exception ex)
{
_log.Error("写入心跳记录失败", ex);
}
}
/// <summary>
/// 配置轮询回调:检查是否有配置变更
/// </summary>
private void OnConfigPoll(object state)
{
try
{
// 重新从DB加载运行时配置
ConfigLoader.LoadRuntimeConfig(_config.BusinessConnection, _config);
// 重新加载采集地址
LoadAndStartWorkers();
}
catch (Exception ex)
{
_log.Error("配置轮询失败", ex);
}
}
/// <summary>
/// 日终汇总检查回调:检查是否到了配置的汇总时间
/// </summary>
private void OnDailySummaryCheck(object state)
{
try
{
// 解析汇总时间
TimeSpan summaryTime;
if (!TimeSpan.TryParse(_config.DailySummaryTime, out summaryTime))
{
summaryTime = new TimeSpan(1, 0, 0); // 默认01:00
}
var now = DateTime.Now;
var targetTime = new DateTime(now.Year, now.Month, now.Day, summaryTime.Hours, summaryTime.Minutes, 0);
// 检查是否到了汇总时间±1分钟内
if (Math.Abs((now - targetTime).TotalMinutes) <= 1)
{
// 汇总昨天的数据
DateTime summaryDate = now.Date.AddDays(-1);
if (_lastSummaryDate != summaryDate)
{
_lastSummaryDate = summaryDate;
_dailySummary.Execute(summaryDate);
}
}
}
catch (Exception ex)
{
_log.Error("日终汇总检查失败", ex);
}
}
}
}

@ -0,0 +1,134 @@
using System;
using System.Collections.Generic;
using Dapper;
using MySqlConnector;
using CncModels.Entity;
using CncModels.Enum;
using log4net;
namespace CncCollector.Core
{
/// <summary>
/// 日终汇总作业。
/// 在配置的时间默认01:00执行结账所有活跃段汇总日产量数据。
/// </summary>
public class DailySummaryJob
{
private static readonly ILog _log = LogManager.GetLogger(typeof(DailySummaryJob));
private readonly string _businessConnStr;
/// <summary>
/// 初始化日终汇总作业
/// </summary>
/// <param name="businessConnStr">业务库连接字符串</param>
public DailySummaryJob(string businessConnStr)
{
_businessConnStr = businessConnStr;
}
/// <summary>
/// 执行日终汇总,汇总指定日期的产量数据
/// </summary>
/// <param name="summaryDate">要汇总的日期(通常为昨天)</param>
public void Execute(DateTime summaryDate)
{
_log.Info($"========== 日终汇总开始(日期={summaryDate:yyyy-MM-dd} ==========");
var sw = System.Diagnostics.Stopwatch.StartNew();
try
{
using (var conn = new MySqlConnection(_businessConnStr))
{
conn.Open();
using (var tran = conn.BeginTransaction())
{
try
{
// 1. 结账所有活跃段end_of_day
conn.Execute(@"UPDATE cnc_production_segment
SET end_time = COALESCE(end_time, NOW()),
close_reason = @Reason, is_settled = 1, updated_at = NOW()
WHERE is_settled = 0 AND DATE(start_time) <= @Date",
new { Reason = SegmentCloseReason.EndOfDay, Date = summaryDate }, tran);
// 2. 为 quantity 为 NULL 的段计算产量
conn.Execute(@"UPDATE cnc_production_segment
SET quantity = GREATEST(0, COALESCE(end_part_count, 0) - start_part_count)
WHERE quantity IS NULL AND is_settled = 1 AND DATE(start_time) <= @Date",
new { Date = summaryDate }, tran);
// 3. 按 (machine_id, production_date, program_name) 汇总到 cnc_daily_production
// 先删除旧汇总(幂等)
conn.Execute(@"DELETE FROM cnc_daily_production WHERE production_date = @Date",
new { Date = summaryDate }, tran);
// 插入新汇总
conn.Execute(@"INSERT INTO cnc_daily_production (machine_id, production_date, program_name, total_quantity, segment_count, total_run_time, total_cutting_time, total_cycle_time, created_at, updated_at)
SELECT machine_id, @Date, program_name,
SUM(COALESCE(quantity, 0)),
COUNT(*),
NULL, NULL, NULL,
NOW(), NOW()
FROM cnc_production_segment
WHERE DATE(start_time) = @Date AND is_settled = 1
GROUP BY machine_id, program_name",
new { Date = summaryDate }, tran);
// 4. 汇总到 cnc_worker_daily_summary通过绑定关系
conn.Execute(@"DELETE FROM cnc_worker_daily_summary WHERE production_date = @Date",
new { Date = summaryDate }, tran);
conn.Execute(@"INSERT INTO cnc_worker_daily_summary (worker_id, production_date, total_quantity, machine_count, program_count, created_at, updated_at)
SELECT wm.worker_id, @Date,
SUM(dp.total_quantity),
COUNT(DISTINCT dp.machine_id),
COUNT(DISTINCT dp.program_name),
NOW(), NOW()
FROM cnc_daily_production dp
INNER JOIN cnc_worker_machine wm ON dp.machine_id = wm.machine_id
WHERE dp.production_date = @Date
GROUP BY wm.worker_id",
new { Date = summaryDate }, tran);
// 5. 更新 cnc_machine_daily_status
conn.Execute(@"DELETE FROM cnc_machine_daily_status WHERE production_date = @Date",
new { Date = summaryDate }, tran);
// 有采集记录的 → normal
conn.Execute(@"INSERT INTO cnc_machine_daily_status (machine_id, production_date, data_status, created_at, updated_at)
SELECT DISTINCT machine_id, @Date, 'normal', NOW(), NOW()
FROM cnc_collect_record WHERE DATE(collect_time) = @Date",
new { Date = summaryDate }, tran);
// 未开机 → offline
conn.Execute(@"INSERT INTO cnc_machine_daily_status (machine_id, production_date, data_status, created_at, updated_at)
SELECT m.id, @Date, 'offline', NOW(), NOW()
FROM cnc_machine m
WHERE m.is_enabled = 1 AND m.id NOT IN (
SELECT DISTINCT machine_id FROM cnc_collect_record WHERE DATE(collect_time) = @Date
) AND m.id NOT IN (
SELECT machine_id FROM cnc_machine_daily_status WHERE production_date = @Date
)",
new { Date = summaryDate }, tran);
tran.Commit();
}
catch
{
tran.Rollback();
throw;
}
}
}
sw.Stop();
_log.Info($"========== 日终汇总完成(日期={summaryDate:yyyy-MM-dd}, 耗时={sw.ElapsedMilliseconds}ms ==========");
}
catch (Exception ex)
{
_log.Error($"日终汇总失败(日期={summaryDate:yyyy-MM-dd}", ex);
}
}
}
}

@ -0,0 +1,124 @@
using System;
using System.Collections.Generic;
using Newtonsoft.Json.Linq;
using CncModels.Entity;
using log4net;
namespace CncCollector.Core
{
/// <summary>
/// JSON 解析 + 字段映射引擎。
/// 根据品牌配置从采集到的 JSON 中提取结构化字段数据。
/// </summary>
public class DataParser
{
private static readonly ILog _log = LogManager.GetLogger(typeof(DataParser));
/// <summary>
/// 解析后的字段容器
/// </summary>
public class ParsedField
{
/// <summary>标准字段名(如 program_name</summary>
public string StandardField { get; set; }
/// <summary>字段值(字符串形式)</summary>
public string StringValue { get; set; }
/// <summary>数值(如果可解析为 decimal</summary>
public decimal? NumericValue { get; set; }
/// <summary>数据类型number/string</summary>
public string DataType { get; set; }
}
/// <summary>
/// 解析单台设备的 JSON tags 数据
/// </summary>
/// <param name="deviceObj">设备 JSON 对象</param>
/// <param name="brand">品牌配置</param>
/// <param name="mappings">品牌字段映射列表</param>
/// <returns>标准字段名 → 解析后的字段值</returns>
public static Dictionary<string, ParsedField> ParseDevice(JObject deviceObj, Brand brand, List<BrandFieldMapping> mappings)
{
var result = new Dictionary<string, ParsedField>();
if (deviceObj == null || mappings == null || mappings.Count == 0)
return result;
// 定位 tags 数组
string tagsPath = brand?.TagsPath ?? "tags";
JToken tagsToken = deviceObj.SelectToken(tagsPath);
if (tagsToken == null || tagsToken.Type != JTokenType.Array)
{
_log.Warn($"无法定位 tags 数组(路径: {tagsPath}");
return result;
}
var tagsArray = tagsToken as JArray;
// 构建 tag id → value 的快速查找字典
var tagDict = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
foreach (var tag in tagsArray)
{
string id = tag["id"]?.ToString() ?? "";
string value = tag["value"]?.ToString() ?? "";
if (!string.IsNullOrEmpty(id))
{
tagDict[id] = value;
}
}
// 按字段映射表提取
foreach (var mapping in mappings)
{
string tagId = mapping.FieldName; // FieldName 存的是 tag 的 id如 "Tag5"
string standardField = mapping.StandardField;
string dataType = mapping.DataType ?? "string";
string rawValue;
if (!tagDict.TryGetValue(tagId, out rawValue))
{
// 字段不存在,跳过(非必填字段可能不存在)
continue;
}
var field = new ParsedField
{
StandardField = standardField,
DataType = dataType,
StringValue = rawValue
};
// 数值型:去除 .00000 尾缀
if (dataType == "number" && !string.IsNullOrEmpty(rawValue))
{
// 尝试解析为 decimal
string cleanValue = rawValue.Trim();
decimal numVal;
if (decimal.TryParse(cleanValue, System.Globalization.NumberStyles.Float,
System.Globalization.CultureInfo.InvariantCulture, out numVal))
{
field.NumericValue = numVal;
}
}
result[standardField] = field;
}
return result;
}
/// <summary>
/// 从设备 JSON 对象中提取 device 字段值
/// </summary>
/// <param name="deviceObj">设备 JSON 对象</param>
/// <param name="deviceField">品牌配置的 device 字段名(默认 "device"</param>
/// <returns>device 值</returns>
public static string ExtractDeviceCode(JObject deviceObj, string deviceField = "device")
{
if (deviceObj == null) return null;
return deviceObj[deviceField]?.ToString();
}
}
}

@ -0,0 +1,260 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Dapper;
using MySqlConnector;
using CncModels.Entity;
using CncModels.Enum;
using log4net;
namespace CncCollector.Core
{
/// <summary>
/// 零件产量分段跟踪引擎。
/// 为每台机床维护内存中的当前活跃段状态,检测程序切换和手动清零。
/// </summary>
public class ProductionTracker : IDisposable
{
private static readonly ILog _log = LogManager.GetLogger(typeof(ProductionTracker));
private readonly string _connectionString;
private readonly object _lock = new object();
/// <summary>
/// 内存缓存machineId → 当前活跃段ID减少DB查询
/// </summary>
private readonly ConcurrentDictionary<int, long> _activeSegmentIds = new ConcurrentDictionary<int, long>();
/// <summary>
/// 内存缓存machineId → 上一次采集的 (programName, partCount)
/// </summary>
private readonly ConcurrentDictionary<int, Tuple<string, decimal?>> _lastCollectState = new ConcurrentDictionary<int, Tuple<string, decimal?>>();
/// <summary>
/// 初始化产量跟踪器
/// </summary>
/// <param name="connectionString">业务库连接字符串</param>
public ProductionTracker(string connectionString)
{
_connectionString = connectionString;
}
/// <summary>
/// 处理一次采集结果:检测程序切换/手动清零,管理活跃段
/// </summary>
/// <param name="machineId">机床ID</param>
/// <param name="programName">当前NC程序名</param>
/// <param name="partCount">当前零件数</param>
/// <param name="collectTime">采集时间</param>
public void Track(int machineId, string programName, decimal? partCount, DateTime collectTime)
{
if (string.IsNullOrEmpty(programName)) return;
lock (_lock)
{
try
{
// 获取上次采集状态
Tuple<string, decimal?> lastState;
bool hasLast = _lastCollectState.TryGetValue(machineId, out lastState);
// 检测是否需要结账
bool needClose = false;
string closeReason = "";
if (hasLast)
{
string lastProgram = lastState.Item1;
decimal? lastPartCount = lastState.Item2;
// 情况1NC程序名变化
if (!string.Equals(lastProgram, programName, StringComparison.OrdinalIgnoreCase))
{
needClose = true;
closeReason = SegmentCloseReason.ProgramChange;
_log.Info($"机床{machineId}: 程序切换 {lastProgram} → {programName}{closeReason}");
}
// 情况2同程序下 part_count 下降(手动清零)
else if (partCount.HasValue && lastPartCount.HasValue && partCount.Value < lastPartCount.Value)
{
needClose = true;
closeReason = SegmentCloseReason.ManualReset;
_log.Info($"机床{machineId}: 零件数下降 {lastPartCount} → {partCount}{closeReason}");
}
}
// 结账当前活跃段
if (needClose)
{
CloseActiveSegment(machineId, partCount, closeReason, collectTime);
}
// 确保有活跃段
long activeSegmentId = EnsureActiveSegment(machineId, programName, partCount, collectTime);
// 更新活跃段的实时 end_part_count
if (activeSegmentId > 0 && partCount.HasValue)
{
UpdateSegmentEndCount(activeSegmentId, partCount.Value);
}
// 更新内存缓存
_lastCollectState[machineId] = Tuple.Create(programName, partCount);
}
catch (Exception ex)
{
_log.Error($"产量跟踪处理失败machine_id={machineId}", ex);
}
}
}
/// <summary>
/// 服务停止时结账所有活跃段
/// </summary>
public void CloseAllActiveSegments()
{
_log.Info("正在结账所有活跃段(服务停止)...");
try
{
using (var conn = new MySqlConnection(_connectionString))
{
// 结账所有活跃段is_settled=0 且 end_time IS NULL
conn.Execute(@"UPDATE cnc_production_segment
SET end_time = NOW(), end_part_count = start_part_count, quantity = 0,
close_reason = @Reason, is_settled = 1, updated_at = NOW()
WHERE is_settled = 0 AND end_time IS NULL",
new { Reason = SegmentCloseReason.ServiceStop });
}
_activeSegmentIds.Clear();
_lastCollectState.Clear();
_log.Info("所有活跃段已结账");
}
catch (Exception ex)
{
_log.Error("结账所有活跃段失败", ex);
}
}
/// <summary>
/// 结账指定机床的活跃段
/// </summary>
/// <param name="machineId">机床ID</param>
/// <param name="endPartCount">结束时的零件数</param>
/// <param name="reason">关闭原因</param>
/// <param name="endTime">结束时间</param>
public void CloseActiveSegment(int machineId, decimal? endPartCount, string reason, DateTime endTime)
{
try
{
long segmentId;
if (!_activeSegmentIds.TryGetValue(machineId, out segmentId))
{
// 从DB查找
using (var conn = new MySqlConnection(_connectionString))
{
segmentId = conn.ExecuteScalar<long>(
"SELECT id FROM cnc_production_segment WHERE machine_id=@MId AND is_settled=0 AND end_time IS NULL ORDER BY start_time DESC LIMIT 1",
new { MId = machineId });
}
}
if (segmentId <= 0) return;
using (var conn = new MySqlConnection(_connectionString))
{
// 获取 start_part_count 计算 quantity
var startCount = conn.ExecuteScalar<decimal?>(
"SELECT start_part_count FROM cnc_production_segment WHERE id=@Id", new { Id = segmentId });
decimal? quantity = null;
if (startCount.HasValue && endPartCount.HasValue)
{
quantity = Math.Max(0, endPartCount.Value - startCount.Value);
}
conn.Execute(@"UPDATE cnc_production_segment
SET end_time = @EndTime, end_part_count = @EndPartCount, quantity = @Quantity,
close_reason = @Reason, is_settled = 1, updated_at = NOW()
WHERE id = @Id",
new { Id = segmentId, EndTime = endTime, EndPartCount = endPartCount, Quantity = quantity, Reason = reason });
}
_activeSegmentIds.TryRemove(machineId, out _);
_log.Debug($"机床{machineId}: 结账段{segmentId}{reason}quantity={endPartCount}");
}
catch (Exception ex)
{
_log.Error($"结账活跃段失败machine_id={machineId}", ex);
}
}
/// <summary>
/// 确保指定机床有一个活跃段,没有则创建
/// </summary>
private long EnsureActiveSegment(int machineId, string programName, decimal? partCount, DateTime collectTime)
{
long segmentId;
if (_activeSegmentIds.TryGetValue(machineId, out segmentId) && segmentId > 0)
return segmentId;
// 从DB查找
using (var conn = new MySqlConnection(_connectionString))
{
segmentId = conn.ExecuteScalar<long>(
"SELECT id FROM cnc_production_segment WHERE machine_id=@MId AND is_settled=0 AND end_time IS NULL ORDER BY start_time DESC LIMIT 1",
new { MId = machineId });
if (segmentId <= 0)
{
// 创建新段
var now = DateTime.Now;
segmentId = conn.ExecuteScalar<long>(@"INSERT INTO cnc_production_segment
(machine_id, program_name, production_date, start_time, start_part_count, is_settled, created_at, updated_at)
VALUES (@MachineId, @ProgramName, @ProductionDate, @StartTime, @StartPartCount, 0, @CreatedAt, @UpdatedAt);
SELECT LAST_INSERT_ID();",
new
{
MachineId = machineId,
ProgramName = programName,
ProductionDate = collectTime.Date,
StartTime = collectTime,
StartPartCount = partCount ?? 0,
CreatedAt = now,
UpdatedAt = now
});
_log.Debug($"机床{machineId}: 创建新段{segmentId}(程序={programName}, startCount={partCount}");
}
}
_activeSegmentIds[machineId] = segmentId;
return segmentId;
}
/// <summary>
/// 更新活跃段的实时 end_part_count
/// </summary>
private void UpdateSegmentEndCount(long segmentId, decimal partCount)
{
try
{
using (var conn = new MySqlConnection(_connectionString))
{
conn.Execute("UPDATE cnc_production_segment SET end_part_count = @Count, updated_at = NOW() WHERE id = @Id",
new { Id = segmentId, Count = partCount });
}
}
catch (Exception ex)
{
_log.Error($"更新活跃段end_part_count失败segment_id={segmentId}", ex);
}
}
public void Dispose()
{
CloseAllActiveSegments();
}
}
}

@ -0,0 +1,87 @@
using System;
using System.IO;
using CncCollector.Config;
using CncCollector.Core;
using CncCollector.Api;
using log4net;
namespace CncCollector
{
/// <summary>
/// CNC机床数据采集服务主入口。
/// 支持控制台模式运行和Windows Service部署。
/// </summary>
class Program
{
private static readonly ILog _log = LogManager.GetLogger(typeof(Program));
static void Main(string[] args)
{
// 初始化log4net
var logConfig = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "log4net.config");
if (File.Exists(logConfig))
{
log4net.Config.XmlConfigurator.Configure(new FileInfo(logConfig));
}
else
{
// 开发模式:从项目根目录读取
logConfig = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "..", "..", "..", "log4net.config");
if (File.Exists(logConfig))
log4net.Config.XmlConfigurator.Configure(new FileInfo(logConfig));
}
Console.WriteLine("CNC 机床数据采集服务 v1.0");
Console.WriteLine("================================================");
// 加载配置
CollectorConfig config;
try
{
config = CollectorConfig.Load();
_log.Info("配置文件加载成功");
}
catch (Exception ex)
{
Console.WriteLine($"错误: 加载配置文件失败 - {ex.Message}");
_log.Fatal("加载配置文件失败", ex);
Console.WriteLine("按任意键退出...");
Console.ReadKey();
return;
}
// 从数据库加载运行时配置
try
{
ConfigLoader.LoadRuntimeConfig(config.BusinessConnection, config);
}
catch (Exception ex)
{
_log.Warn("从数据库加载配置失败,使用文件默认值", ex);
Console.WriteLine($" [!] 数据库配置加载失败: {ex.Message}");
}
// 创建引擎和API
var engine = new CollectorEngine(config);
var apiServer = new CollectorApiServer(engine, config.ApiKey, config.ApiPort);
// 启动引擎
engine.Start();
// 启动管理API
apiServer.Start();
Console.WriteLine($"\n管理API: http://localhost:{config.ApiPort}/api/collector/status");
Console.WriteLine($"API Key: {config.ApiKey}");
Console.WriteLine($"\n按任意键退出...");
Console.ReadKey();
// 退出
engine.Stop();
apiServer.Stop();
_log.Info("采集服务已退出");
Console.WriteLine("已退出。");
}
}
}

@ -0,0 +1,18 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// 有关程序集的一般信息由以下特性控制。
// 更改这些特性值可修改与程序集关联的信息。
[assembly: AssemblyTitle("CncCollector")]
[assembly: AssemblyDescription("CNC机床数据采集服务")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("CncCollector")]
[assembly: AssemblyCopyright("Copyright © 2026")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// 程序集的版本信息由下列四个值组成:
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

@ -0,0 +1,10 @@
{
"businessConnection": "Server=localhost;Database=cnc_business;Uid=root;Pwd=root;Charset=utf8mb4;SslMode=None;",
"logConnection": "Server=localhost;Database=cnc_log;Uid=root;Pwd=root;Charset=utf8mb4;SslMode=None;",
"apiPort": 5800,
"apiKey": "collector_api_key_2026",
"heartbeatIntervalSeconds": 10,
"configPollIntervalSeconds": 30,
"dailySummaryTime": "01:00",
"serviceId": "CncCollector"
}

@ -0,0 +1,24 @@
<?xml version="1.0" encoding="utf-8" ?>
<log4net>
<appender name="ConsoleAppender" type="log4net.Appender.ConsoleAppender">
<layout type="log4net.Layout.PatternLayout">
<conversionPattern value="%date [%thread] %-5level %logger - %message%newline" />
</layout>
</appender>
<appender name="RollingFileAppender" type="log4net.Appender.RollingFileAppender">
<file value="logs\\collector.log" />
<appendToFile value="true" />
<rollingStyle value="Date" />
<datePattern value="yyyyMMdd" />
<maxSizeRollBackups value="30" />
<maximumFileSize value="10MB" />
<layout type="log4net.Layout.PatternLayout">
<conversionPattern value="%date [%thread] %-5level %logger - %message%newline" />
</layout>
</appender>
<root>
<level value="INFO" />
<appender-ref ref="ConsoleAppender" />
<appender-ref ref="RollingFileAppender" />
</root>
</log4net>
Loading…
Cancel
Save