You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
34 KiB
34 KiB
CNC机床数据采集系统 - 采集服务设计文档
最后更新:2026-04-30 状态:设计中
一、概述
采集服务(CncCollector)是一个独立 Windows Service,负责定时从采集地址HTTP拉取JSON数据,通过品牌字段映射解析为结构化数据,实时更新机床状态,追踪产量分段,并执行日终汇总。
1.1 核心职责
| 职责 | 说明 |
|---|---|
| HTTP数据拉取 | 按采集地址配置的间隔,定时GET拉取JSON |
| 字段映射解析 | 用品牌模板的映射规则,从JSON中提取标准字段 |
| 实时状态更新 | 更新机床表的在线状态、采集状态、程序名、零件数等 |
| 产量分段追踪 | 检测NC程序名切换、零件数手动清零,自动结账/开段 |
| 日终汇总 | 按自然天汇总产量到机床+程序级别、工人级别 |
| 原始数据存档 | 存原始JSON到日志库,支持事后重解析 |
| 心跳上报 | 定时写心跳记录,供管理后台判断服务是否在线 |
| 告警生成 | 采集失败、未知设备、字段缺失等异常生成告警记录 |
1.2 技术栈
| 项 | 选型 | 说明 |
|---|---|---|
| 框架 | .NET Framework 4.7.2 Windows Service | 与Web API同框架 |
| ORM | Dapper | 复用现有CncRepository层 |
| HTTP客户端 | System.Net.Http.HttpClient | 拉取采集地址JSON |
| JSON解析 | Newtonsoft.Json 12.0.3 | 与Web API版本一致 |
| 日志 | log4net | 文件日志 + DB日志双写 |
| 数据库 | MariaDB | 业务库 + 日志库双库 |
| 管理API | HttpListener(轻量自托管) | 5800端口,独立于IIS |
1.3 与现有系统的关系
┌─────────────────────────────────────────────────┐
│ Windows Server │
│ │
│ ┌──────────────┐ ┌───────────────────────┐ │
│ │ IIS │ │ CncCollector │ │
│ │ CncWebApi │ │ (Windows Service) │ │
│ │ :80 │ │ 管理API :5800 │ │
│ │ 管理后台/大屏 │ │ 采集循环+Ping+汇总 │ │
│ └──────┬───────┘ └───────────┬───────────┘ │
│ │ │ │
│ └──────────┬─────────────┘ │
│ ▼ │
│ ┌────────────────────┐ │
│ │ MariaDB │ │
│ │ cnc_business │ │
│ │ cnc_log │ │
│ └────────────────────┘ │
└─────────────────────────────────────────────────┘
- CncCollector 复用现有的
CncModels、CncRepository层(直接引用项目) - 不复用
CncService层(采集服务有独立的业务逻辑,不依赖Web API的Service) - 通过数据库共享数据,不直接调用Web API
二、项目结构
2.1 新建项目
在解决方案中新增 CncCollector 项目:
src/CncCollector/
├── CncCollector.csproj ← .NET Framework 4.7.2 类库
├── ServiceEntry.cs ← Windows Service 入口(ServiceBase子类)
├── ServiceInstaller.cs ← 安装器(Installer子类,installutil用)
├── Program.cs ← 主入口(Main)
│
├── Engine/
│ ├── CollectorEngine.cs ← 引擎主控(启停所有任务、生命周期管理)
│ ├── CollectorTask.cs ← 单个采集地址的采集循环
│ └── PingTask.cs ← 单个采集地址的Ping循环
│
├── Parser/
│ ├── FieldMapper.cs ← 字段映射解析器(纯函数,输入JSON+映射规则→标准字段字典)
│ └── ValueConverter.cs ← 数值转换(去.00000尾缀,string/number类型转换)
│
├── Tracker/
│ └── ProductionTracker.cs ← 产量分段追踪(检测程序切换/手动清零→结账/开段)
│
├── Jobs/
│ └── DailySummaryJob.cs ← 日终汇总任务(关闭活跃段→合并产量→工人汇总→日状态)
│
├── Config/
│ ├── ConfigProvider.cs ← 配置热更新(DB轮询+HTTP通知双保险)
│ └── CollectorConfig.cs ← 配置快照数据结构
│
├── Heartbeat/
│ └── HeartbeatReporter.cs ← 心跳上报(定时写log_collector_heartbeat)
│
├── Api/
│ ├── ManagementServer.cs ← 轻量HTTP管理API(HttpListener自托管)
│ └── ManagementController.cs ← API端点实现(状态/重载/启停/汇总)
│
├── Logging/
│ └── DualLogger.cs ← 双写日志(文件log4net + DB log_system)
│
└── App.config ← 连接串、端口、API Key、log4net配置
2.2 项目引用
CncCollector → CncRepository → CncModels
↘
(不引用 CncService、CncWebApi)
CncCollector 直接使用 Repository 层操作数据库,不经过 Service 层。原因:
- 采集服务有大量独立的业务逻辑(字段映射、产量分段、汇总计算),与Web API的CRUD Service职责不同
- 避免与Web API产生耦合(Service层可能被修改影响采集服务)
三、核心流程设计
3.1 采集地址的数据模型
采集地址(cnc_collect_address)
├── URL:HTTP端点地址(如 http://192.168.1.101/)
├── 品牌:决定字段映射规则
├── 采集间隔:本地址独立配置
├── 关联机床:该地址下要采的机床列表
└── 启停状态:is_enabled
一次HTTP GET 返回:
[ ← JSON数组
{ device:"fanake_1.8", desc:"西-1.8", tags:[{id:"Tag5",value:"1566.NC"},...] },
{ device:"fanake_1.9", desc:"西-1.9", tags:[{id:"Tag5",value:"O1"},...] },
...
]
device值 匹配 cnc_machine.device_code → 定位具体机床
tags数组 通过品牌字段映射规则 → 提取标准字段
3.2 双循环模型(Ping + 采集)
每个启用的采集地址启动两个独立的 Task:
3.2.1 Ping循环
PingTask(address):
while(服务运行中 && address启用):
try:
success = Ping(address.Url) // ICMP Ping采集地址
catch:
success = false
更新该地址下所有机床:
is_online = success ? 1 : 0
last_ping_time = NOW()
sleep(ping_interval) // 全局配置,默认60秒
关键点:
- Ping结果仅用于管理后台/大屏显示机床在线状态
- Ping不通不影响采集逻辑——采集循环照常HTTP拉取
- 机床表的
ip_address字段不参与Ping逻辑,仅信息记录 - Ping是针对采集地址URL(HTTP端点),不是单台机床
3.2.2 采集循环
CollectTask(address):
while(服务运行中 && address启用):
roundStart = NOW()
// 检查重载/停止请求(本轮开始前)
if(停止请求): 安全退出
if(重载请求): 执行重载后继续
try:
// 1. HTTP拉取
responseJson, duration = HTTP GET address.Url
// 2. 原始JSON存档(日志库,先于解析,防止崩溃丢数据)
写入 log_collect_raw(is_success=1, raw_json=responseJson)
// 3. 按device逐台解析
devices = 解析JSON数组
for each device in devices:
处理单个设备(device, address)
// 4. 更新采集地址状态
address.last_collect_time = NOW()
address.last_collect_status = "success"
address.fail_count = 0
catch(异常 ex):
处理采集失败(address, ex)
// 本轮完成,检查挂起的停止/重载请求
if(停止请求): 安全退出
if(重载请求): 执行重载
// 计算本轮耗时,sleep剩余间隔
elapsed = NOW() - roundStart
sleep(max(0, collect_interval - elapsed))
关键点:
- 原始JSON先写入日志库再解析,防止解析过程中服务崩溃导致数据丢失
- 每轮采集结束才检查停止/重载请求,保证不会中断中间状态
- 采集间隔扣除本轮耗时,保证频率稳定
3.3 处理单个设备
处理单个设备(deviceObj, address):
// 1. 匹配机床
deviceCode = deviceObj[brand.deviceField] // 默认取 device 字段
machine = 按 device_code 查机床表
if(machine == null):
// 未知设备 → 告警(带完整JSON)
创建告警(alert_type="unknown_device", detail=deviceObj完整JSON)
return
if(machine.is_enabled == 0):
return // 已停用的机床跳过
// 2. 字段映射解析
fields = FieldMapper.Map(deviceObj, address.字段映射规则)
// fields = { program_name:"1566.NC", part_count:1219, device_status:1, ... }
// 3. 处理缺失字段
missingRequired = 检查必填字段缺失情况
if(missingRequired.Count > 0):
创建告警(alert_type="collect_fail", machine_id=machine.id,
detail="缺失字段: " + string.Join(", ", missingRequired))
// 缺失字段写null,不跳过整条记录
// 4. 写结构化记录(业务库 cnc_collect_record)
写入 CollectRecord(machine_id, collect_time=NOW(), fields)
// 5. 更新机床实时状态
更新 cnc_machine:
last_collect_time = NOW()
last_program_name = fields.program_name
last_part_count = fields.part_count
last_device_status = fields.device_status
last_run_status = fields.run_status
last_operate_mode = fields.operate_mode
last_machining_status = fields.machining_status
// 6. 产量分段追踪
ProductionTracker.Track(machine.id, fields.program_name, fields.part_count)
3.4 字段映射解析器
输入
deviceObj = { "device": "fanake_1.8", "desc": "西-1.8", "tags": [...] }
mappings = [
{ standard_field: "program_name", field_name: "Tag5", match_by: "id", data_type: "string" },
{ standard_field: "part_count", field_name: "Tag8", match_by: "id", data_type: "number" },
{ standard_field: "device_status",field_name: "_io_status", match_by: "id", data_type: "number" },
...
]
解析逻辑
FieldMapper.Map(deviceObj, mappings) → Dictionary<string, object>
对每条映射规则:
1. 在 deviceObj.tags 数组中查找匹配:
match_by == "id" → 找 tag.id == field_name
match_by == "desc" → 找 tag.desc == field_name
2. 找到则提取 tag.value:
data_type == "number" → 去除".00000"尾缀,转decimal
data_type == "string" → 直接取字符串
3. 未找到:
结果为 null
if(is_required == 1) → 加入缺失列表
数值转换规则
ValueConverter.Convert(rawValue, dataType):
if(dataType == "number"):
// "1219.00000" → 1219
// "0.00000" → 0
// "1566.NC" → null(非数字,不抛异常)
str = rawValue.Trim()
if(decimal.TryParse(str, out result)):
return result
else:
return null
if(dataType == "string"):
return rawValue
3.5 产量分段追踪
核心状态机
每台机床的产量追踪状态 = cnc_production_segment 表中的一条活跃记录(is_settled=0, end_time=NULL)
触发结账的事件:
1. NC程序名变化 → close_reason = "program_change"
2. 同程序下part_count下降(手动清零) → close_reason = "manual_reset"
3. 日终汇总时仍在运行 → close_reason = "end_of_day"
4. 服务停止/崩溃恢复 → close_reason = "service_stop" / "service_crash"
Track逻辑伪代码
ProductionTracker.Track(machineId, currentProgramName, currentPartCount):
activeSegment = repository.GetActiveSegment(machineId)
if(activeSegment == null):
// 无活跃段 → 开新段
CreateSegment(machineId, currentProgramName, currentPartCount)
return
if(activeSegment.program_name != currentProgramName):
// NC程序切换 → 结账旧段,开新段
quantity = activeSegment.start_part_count 不变(上一段的part_count快照)
// 实际产量 = currentPartCount - activeSegment.start_part_count
// 但注意:旧的activeSegment.start_part_count是旧程序开始时的值
CloseSegment(activeSegment.id,
endPartCount = currentPartCount,
quantity = currentPartCount - activeSegment.start_part_count,
closeReason = "program_change")
CreateSegment(machineId, currentProgramName, currentPartCount)
return
if(currentPartCount < activeSegment.start_part_count):
// 同程序下part_count下降 → 手动清零
// 结账旧段(产量 = 清零前的值 - 段起始值)
CloseSegment(activeSegment.id,
endPartCount = 上一次采集的part_count(需要内存缓存),
quantity = 上一次采集的part_count - activeSegment.start_part_count,
closeReason = "manual_reset")
// 开新段,从当前值(清零后的值)开始
CreateSegment(machineId, currentProgramName, currentPartCount)
return
// 正常运行 → 无需操作,活跃段继续
// 可选:更新内存中"上一次采集的part_count"缓存
part_count缓存
为了检测手动清零,需要知道上一次采集的part_count。在内存中维护:
Dictionary<int, decimal?> _lastPartCountByMachine // machineId → 上次part_count
每次采集后更新:
_lastPartCountByMachine[machineId] = currentPartCount
手动清零判断条件:
currentPartCount < activeSegment.start_part_count
(零件数比段起始值还小,说明被手动清零了)
A-B-C-A-B场景
时间线:
10:00 program=A, part=0 → 开段(A, start=0)
10:30 program=A, part=10 → 正常
11:00 program=B, part=0 → 结段(A, qty=10), 开段(B, start=0)
11:30 program=C, part=0 → 结段(B, qty=0), 开段(C, start=0)
12:00 program=A, part=0 → 结段(C, qty=0), 开段(A, start=0) ← 新段,不合并
12:30 program=A, part=5 → 正常
日汇总时:
按 (machine_id, production_date, program_name) 合并
program=A 的总产量 = 10 + 5 = 15
program=B 的总产量 = 0
program=C 的总产量 = 0
四、采集失败与重试
4.1 重试机制
一轮采集循环中的重试:
第1次采集 → 失败
sleep(retry_interval) // 默认30秒,后台可配
第2次采集 → 失败
sleep(retry_interval)
第3次采集 → 失败(达到 retry_count 上限,默认3次,后台可配)
重试总耗时 = retry_count × retry_interval
if(重试总耗时 < collect_interval):
// 正常情况:重试耗时不影响下一轮
fail_count++
sleep(collect_interval - 重试总耗时) // 等待剩余间隔
else:
// 异常情况:重试耗时超过了采集间隔
// 重置本轮错误状态(不算一次有效失败)
// 直接进入下一轮采集
不增加 fail_count
立即开始下一轮
if(fail_count >= collect_fail_alert_threshold): // 默认5次
创建告警(alert_type="collect_fail")
fail_count = 0 // 重置,避免重复告警
4.2 配置约束
| 配置项 | 默认值 | 说明 |
|---|---|---|
collect_retry_count |
3 | 每轮最大重试次数 |
collect_retry_interval |
30 | 重试间隔(秒) |
collect_fail_alert_threshold |
5 | 连续失败N轮触发告警 |
约束:正常配置下 retry_count × retry_interval < collect_interval。如果不满足,重试超时不算失败。
4.3 fail_count 连续性
- 采集成功一次 →
fail_count立即清零 - 只有连续失败才累积
- 达到阈值触发告警后清零,避免同一问题重复告警
五、配置热更新
5.1 双保险机制
管理后台修改 → 写DB → HTTP通知采集服务 → 采集服务验证生效
┌─── 即时通知(HTTP POST /api/reload)───┐
│ │
管理后台写DB ──┤ ├──→ 采集服务重载配置
│ │
└─── 兜底轮询(每30秒查DB)─────────────────┘
5.2 重载流程
1. 管理后台修改采集地址/品牌模板 → 写入数据库
2. 管理后台调用采集服务API:
POST http://localhost:5800/api/reload
Headers: X-Api-Key: {collector_api_key}
3. 采集服务收到重载请求:
a. 标记"重载请求挂起",不立即中断
b. 等待所有地址当前轮采集完成
c. 从DB重新加载配置:
- cnc_collect_address(启用的地址列表)
- cnc_brand_field_mapping(字段映射规则)
- cnc_sys_config(系统配置)
d. 对比新旧配置,生成变更摘要
e. 调整运行中的任务:
- 新增的地址 → 启动新的Ping+采集Task
- 停用的地址 → 停止对应Task
- 变更的地址 → 停止旧Task,启动新Task
f. 返回变更结果
4. 管理后台验证:
GET http://localhost:5800/api/config/hash
对比本地DB配置的hash → 一致则确认生效
5.3 配置快照
内存中维护的配置数据结构:
class CollectorConfig
{
string ConfigHash; // MD5指纹,快速判断是否变更
List<CollectAddress> Addresses; // 启用的采集地址列表
Dictionary<int, List<BrandFieldMapping>> BrandMappings; // brandId → 字段映射
Dictionary<string, string> SysConfig; // 系统配置键值对
DateTime LoadedAt; // 加载时间
}
5.4 管理后台交互
管理后台修改采集地址/品牌模板后:
- 保存按钮旁显示"已通知采集服务重载 ✓"或"通知失败,将在30秒后自动同步 ⚠"
- 通过Web API后端转发HTTP请求到采集服务(前端不直连5800端口)
六、日终汇总
6.1 触发机制(启动时补检 + 内置定时器)
DailySummaryJob:
// 启动时自检
OnStart():
yesterday = DateTime.Today.AddDays(-1)
if(昨天尚未汇总):
立即执行昨天的汇总
// 定时检查(每分钟)
Timer(60秒):
now = DateTime.Now
targetDate = DateTime.Today.AddDays(-1) // 汇总昨天
if(now >= 配置的daily_summary_time): // 默认01:00
if(今天尚未执行过汇总):
执行汇总(targetDate)
记录"今天已汇总"
// HTTP API 手动触发
OnManualTrigger(date):
执行汇总(date) // 幂等:先删旧数据再重算
优势:
- 服务01:00挂了,02:00恢复后自动补上
- 手动触发支持重算任意日期
- 幂等设计:同一天汇总多次不会重复
6.2 汇总步骤
DailySummaryJob.Execute(targetDate):
for each 启用的机床:
// Step 1: 关闭该机床的所有活跃段
activeSegments = 查询(machine_id, is_settled=0, end_time=NULL)
for each segment in activeSegments:
CloseSegment(segment.id,
endPartCount = lastPartCount缓存或段内最大part_count,
quantity = endPartCount - segment.start_part_count,
closeReason = "end_of_day")
// Step 2: 标记结算
SettleByDate(targetDate)
// Step 3: 按程序合并 → cnc_daily_production
segments = 查询该机床targetDate的所有段
grouped = segments.GroupBy(s => s.program_name)
for each group:
UpsertDailyProduction(
machine_id, targetDate, program_name,
total_quantity = Sum(group.quantity),
segment_count = group.Count()
)
// Step 4: 写机床日状态
UpsertMachineDailyStatus(
machine_id, targetDate,
data_status = 根据当天采集情况判断:
"normal" → 至少成功采集过一次
"offline" → 当天从未Ping通
"data_missing" → Ping通但采集始终失败
)
// Step 5: 工人汇总 → cnc_worker_daily_summary
for each worker:
machines = 该工人绑定的机床列表
workerProductions = 查这些机床在targetDate的daily_production
UpsertWorkerDailySummary(
worker_id, targetDate,
total_quantity = Sum(workerProductions.total_quantity),
machine_count = 去重机床数,
program_count = 去重程序名数
)
// Step 6: 未开机机床插0产量记录
allEnabledMachines = 所有启用的机床
machinesWithData = 当天有daily_production记录的机床
for each machine in (allEnabledMachines - machinesWithData):
InsertDailyProduction(machine.id, targetDate, quantity=0)
UpsertMachineDailyStatus(machine.id, targetDate, "offline")
七、心跳上报
7.1 心跳机制
HeartbeatReporter:
每 heartbeat_interval 秒(默认10秒,可配):
写入 log_collector_heartbeat:
service_id = "CncCollector"
status = "running"
collect_address_id = null(全局心跳)
last_collect_time = 最近一次成功采集时间
success_count = 本轮成功采集次数
fail_count = 本轮失败采集次数
uptime_seconds = (NOW() - 服务启动时间).TotalSeconds
detail = JSON.stringify({
activeTasks: 活跃采集任务数,
activePings: 活跃Ping任务数,
configHash: 当前配置指纹
})
7.2 Web API判断服务状态
-- 管理后台/大屏查询采集服务是否在线
SELECT * FROM log_collector_heartbeat
WHERE service_id = 'CncCollector'
AND created_at > NOW() - INTERVAL 30 SECOND
ORDER BY created_at DESC LIMIT 1
-- 有记录 → 运行中;无记录 → 已停止
八、日志系统
8.1 双写策略
| 事件类型 | 文件日志 | DB日志(log_system) |
|---|---|---|
| 每次采集成功 | ✅ INFO | ❌ 不写(量太大) |
| 采集失败/重试 | ✅ ERROR | ✅ ERROR |
| 字段缺失告警 | ✅ WARN | ❌(写入cnc_alert) |
| 未知设备告警 | ✅ WARN | ❌(写入cnc_alert) |
| 配置变更/重载 | ✅ INFO | ✅ INFO |
| 服务启停 | ✅ INFO | ✅ INFO |
| 日终汇总 | ✅ INFO | ✅ INFO |
| 异常崩溃 | ✅ FATAL | ✅ ERROR(如果还能写的话) |
| Ping状态变化 | ✅ DEBUG | ❌ |
8.2 文件日志格式
logs/collector-2026-04-30.log
[2026-04-30 01:00:02 INFO ] 日终汇总开始,目标日期:2026-04-29
[2026-04-30 01:00:05 INFO ] 日终汇总完成,处理160台机床,耗时3.2秒
[2026-04-30 01:00:30 INFO ] 地址[FANUC-1号] 采集成功,获取24台设备数据,耗时120ms
[2026-04-30 01:01:00 WARN ] 机床[西-1.8](id=5) 必填字段缺失: spindle_speed_set
[2026-04-30 01:02:00 ERROR] 地址[FANUC-2号] 采集失败(第3次重试): 连接超时
[2026-04-30 01:02:05 INFO ] 配置重载完成:新增地址1个,停用地址1个,更新映射3条
[2026-04-30 01:02:30 WARN ] 未知设备 device=fanake_1.15,采集地址[FANUC-1号]
8.3 log4net配置要点
<!-- App.config 中 log4net 配置 -->
<log4net>
<appender name="RollingFile" type="log4net.Appender.RollingFileAppender">
<file value="logs/collector" />
<datePattern value="-yyyy-MM-dd'.log'" />
<maximumFileSize value="50MB" />
<maxSizeRollBackups value="10" />
<layout type="log4net.Layout.PatternLayout">
<conversionPattern value="[%date %-5level] %message%newline" />
</layout>
</appender>
<root>
<level value="INFO" />
<appender-ref ref="RollingFile" />
</root>
</log4net>
九、优雅停止与异常恢复
9.1 优雅停止
核心原则:收到停止请求后,等待所有地址当前轮采集完成再退出
CollectorEngine.Stop():
1. 标记 _stopRequested = true
2. 日志记录"收到停止请求,等待当前轮采集完成"
3. 等待所有CollectorTask和PingTask完成(带超时)
Task.WaitAll(allTasks, timeout: 30秒)
4. 关闭所有活跃的产量分段:
for each 活跃段:
CloseSegment(reason="service_stop")
5. 写最终心跳(status="stopped")
6. 日志记录"服务已停止"
7. 释放资源
9.2 配置重载时的保护
CollectorEngine.Reload():
1. 标记 _reloadRequested = true
2. 日志记录"收到重载请求,等待当前轮采集完成"
3. 不中断任何正在执行的采集任务
4. 等待所有任务到达本轮结束点(循环顶部检查标记)
5. 从DB重新加载配置
6. 对比新旧配置,生成变更摘要
7. 停掉不再需要的任务
8. 启动新任务
9. 清除 _reloadRequested 标记
10. 返回变更摘要
9.3 异常停止(崩溃/强杀)的数据防护
产量分段——天然事务边界
采集服务对产量分段的操作只有两种原子动作:
1. 开新段:INSERT → end_time=NULL
2. 结旧段:UPDATE → 设置end_time
最坏情况:结旧段成功但开新段失败
后果:机床没有活跃段
恢复:下次服务启动时自检恢复(见下)
数据丢失:最多丢失一次采集的产量,不会出现脏数据
服务启动时自检恢复
CollectorEngine.OnStart():
1. 加载配置
2. 自检恢复:
activeSegments = 查询所有 is_settled=0 AND end_time=NULL 的段
for each segment:
if(segment.start_time 距今 > 2小时):
// 很可能是崩溃遗留的孤儿段
CloseSegment(segment.id,
endPartCount = segment.start_part_count,
quantity = 0,
closeReason = "service_crash",
endTime = segment.start_time.AddMinutes(5))
3. 启动Ping+采集任务
4. 自检日终汇总
原始JSON先写后解析
采集流程中的写入顺序:
第1步:写入 log_collect_raw(原始JSON + is_success=1) ← 先保存
第2步:逐台解析 + 写 cnc_collect_record ← 后处理
如果第2步中途崩溃:
- 原始JSON已安全保存
- 可通过管理API手动触发"重解析":按log_collect_raw的记录重新跑字段映射
采集记录——天然原子性
每台设备的结构化数据是一条独立的 INSERT
即使中途崩溃,已写入的记录是完整的
未写入的记录只是缺失,下次采集会补上
十、管理API
10.1 端点列表
| 方法 | URL | 说明 | 认证 |
|---|---|---|---|
| GET | /api/status | 服务状态(运行时长、任务数、各地址状态) | X-Api-Key |
| POST | /api/reload | 立即重载配置(等本轮完成后执行) | X-Api-Key |
| POST | /api/stop | 停止所有采集任务 | X-Api-Key |
| POST | /api/start | 启动所有采集任务 | X-Api-Key |
| GET | /api/tasks | 当前运行的任务列表 | X-Api-Key |
| GET | /api/config/snapshot | 当前生效的配置快照 | X-Api-Key |
| GET | /api/config/hash | 配置指纹(MD5) | X-Api-Key |
| POST | /api/daily-summary | 手动触发日终汇总,参数: date=2026-04-29 | X-Api-Key |
| POST | /api/reparse | 重解析指定原始记录,参数: rawId=12345 | X-Api-Key |
10.2 认证
所有管理API请求需携带 Header X-Api-Key,值与 cnc_sys_config 表中的 collector_api_key 一致。
10.3 实现方式
使用 System.Net.HttpListener 自托管,独立于IIS:
// ManagementServer.cs
HttpListener listener = new HttpListener();
listener.Prefixes.Add("http://+:{port}/api/");
listener.Start();
// 请求处理循环
while(running)
{
var context = listener.GetContext();
// 路由到 ManagementController 对应方法
}
10.4 响应格式
{
"code": 0,
"message": "success",
"data": { ... }
}
与Web API统一格式。
十一、告警类型汇总
| alert_type | 触发条件 | detail内容 |
|---|---|---|
collect_fail |
连续N轮采集失败 | 失败原因摘要 |
collect_fail |
必填字段缺失 | 缺失字段列表 |
unknown_device |
device匹配不到机床表 | 完整device JSON对象 |
device_offline |
(预留,当前Ping状态直接更新机床表) | - |
service_error |
采集服务内部异常 | 异常消息+堆栈 |
告警频率控制
- 同一台机床同一个问题,5分钟内不重复告警
- 未知设备告警:同一device编码,1小时内不重复
- 告警表不记录采集成功的正常事件
十二、App.config 配置项
<configuration>
<connectionStrings>
<add name="BusinessConnection"
connectionString="Server=localhost;Database=cnc_business;Uid=root;Pwd=root;Charset=utf8mb4;SslMode=None;" />
<add name="LogConnection"
connectionString="Server=localhost;Database=cnc_log;Uid=root;Pwd=root;Charset=utf8mb4;SslMode=None;" />
</connectionStrings>
<appSettings>
<!-- 服务标识 -->
<add key="ServiceId" value="CncCollector" />
<!-- 管理API -->
<add key="ManagementApiPort" value="5800" />
<add key="CollectorApiKey" value="与cnc_sys_config.collector_api_key一致" />
<!-- 采集参数(首次启动默认值,后续从DB读取) -->
<add key="PingInterval" value="60" />
<add key="CollectRetryCount" value="3" />
<add key="CollectRetryInterval" value="30" />
<add key="CollectFailAlertThreshold" value="5" />
<add key="HeartbeatInterval" value="10" />
<add key="ConfigPollInterval" value="30" />
<!-- 日终汇总 -->
<add key="DailySummaryTime" value="01:00" />
<!-- 日志 -->
<add key="LogLevel" value="INFO" />
</appSettings>
</configuration>
十三、数据库交互清单
13.1 采集服务读取的表
| 表 | 库 | 用途 | 频率 |
|---|---|---|---|
| cnc_collect_address | 业务 | 加载启用的采集地址列表 | 每30秒/重载时 |
| cnc_brand_field_mapping | 业务 | 加载品牌字段映射规则 | 每30秒/重载时 |
| cnc_brand | 业务 | 获取品牌device_field/tags_path | 每30秒/重载时 |
| cnc_machine | 业务 | 按device_code匹配机床 | 每次采集 |
| cnc_production_segment | 业务 | 查活跃段 | 每次采集每台机床 |
| cnc_worker_machine | 业务 | 查机床绑定的工人(日汇总用) | 日汇总时 |
| cnc_sys_config | 业务 | 读取系统配置 | 每30秒/重载时 |
| log_collect_raw | 日志 | 读取原始记录(重解析用) | 按需 |
13.2 采集服务写入的表
| 表 | 库 | 用途 | 频率 |
|---|---|---|---|
| log_collect_raw | 日志 | 存原始JSON | 每次采集 |
| log_collector_heartbeat | 日志 | 心跳 | 每10秒 |
| log_system | 日志 | 关键事件日志 | 关键事件时 |
| cnc_collect_record | 业务 | 结构化采集记录 | 每次采集每台机床 |
| cnc_production_segment | 业务 | 开段/结段 | 程序切换/清零时 |
| cnc_daily_production | 业务 | 日汇总产量 | 日汇总时 |
| cnc_worker_daily_summary | 业务 | 工人日汇总 | 日汇总时 |
| cnc_machine_daily_status | 业务 | 机床日状态 | 日汇总时 |
| cnc_machine | 业务 | 更新实时状态字段 | 每次采集每台机床 |
| cnc_collect_address | 业务 | 更新last_collect_time/fail_count | 每次采集 |
| cnc_alert | 业务 | 告警记录 | 异常时 |
13.3 需要新增的Repository方法
以下方法在现有接口中不存在,需要新增:
// ICollectRecordRepository(新建)
long Create(CollectRecord entity);
// IMachineRepository(补充)
void UpdateRealtimeStatus(int machineId, Machine realtimeFields);
// IDailyProductionRepository(补充)
int Upsert(DailyProduction entity); // INSERT ON DUPLICATE KEY UPDATE
void DeleteByMachineAndDate(int machineId, DateTime date);
// IWorkerDailySummaryRepository(补充)
int Upsert(WorkerDailySummary entity);
十四、部署与运维
14.1 安装为Windows Service
# 安装
installutil CncCollector.exe
# 卸载
installutil /u CncCollector.exe
# 或使用 sc 命令
sc create CncCollector binPath= "E:\opencode\haoliang\src\CncCollector\bin\Debug\CncCollector.exe"
sc start CncCollector
sc stop CncCollector
14.2 文件部署位置
E:\opencode\haoliang\src\CncCollector\bin\Debug\
├── CncCollector.exe
├── CncCollector.exe.config ← App.config 编译输出
├── CncModels.dll ← 引用的模型层
├── CncRepository.dll ← 引用的仓储层
├── Dapper.dll
├── MySqlConnector.dll
├── Newtonsoft.Json.dll
├── log4net.dll
└── logs\ ← 日志目录(自动创建)
14.3 与Web API共享数据库
- 采集服务和Web API同时读写同一个MariaDB
- 采集服务只写自己的表(采集记录、产量段、日汇总等),不会覆盖Web API管理的配置数据
- 配置变更通过DB传递,双保险保证一致
14.4 防火墙
- 开放5800端口供管理API通信(仅本机/局域网访问)
- 采集服务需要出站HTTP访问采集地址URL
- Ping需要ICMP出站权限
十五、开发顺序建议
| 步骤 | 内容 | 依赖 |
|---|---|---|
| 1 | 新建CncCollector项目 + 引用CncModels/CncRepository + App.config | 无 |
| 2 | FieldMapper + ValueConverter(纯函数,可独立测试) | 步骤1 |
| 3 | CollectorTask 单地址采集循环(HTTP拉取+解析+写库) | 步骤2 |
| 4 | ProductionTracker 产量分段追踪 | 步骤1 |
| 5 | PingTask Ping循环 | 步骤1 |
| 6 | CollectorEngine 引擎(启停所有Task) | 步骤3+4+5 |
| 7 | HeartbeatReporter 心跳 | 步骤1 |
| 8 | ConfigProvider 配置热更新 | 步骤6 |
| 9 | DailySummaryJob 日终汇总 | 步骤4 |
| 10 | ManagementServer + ManagementController 管理API | 步骤6+8+9 |
| 11 | DualLogger 双写日志 | 步骤1 |
| 12 | ServiceEntry + ServiceInstaller Windows Service包装 | 步骤6+10 |
| 13 | 集成测试(完整链路) | 全部 |
建议先跑通 步骤1→2→3 这条最小链路(一个地址→拉取→解析→写库),再逐步叠加其他功能。