You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
haoliang-net/docs/06-采集服务设计.md

1013 lines
34 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# CNC机床数据采集系统 - 采集服务设计文档
> 最后更新2026-04-30
> 状态:设计中
---
## 一、概述
采集服务CncCollector是一个独立 Windows Service负责定时从采集地址HTTP拉取JSON数据通过品牌字段映射解析为结构化数据实时更新机床状态追踪产量分段并执行日终汇总。
### 1.1 核心职责
| 职责 | 说明 |
|------|------|
| HTTP数据拉取 | 按采集地址配置的间隔定时GET拉取JSON |
| 字段映射解析 | 用品牌模板的映射规则从JSON中提取标准字段 |
| 实时状态更新 | 更新机床表的在线状态、采集状态、程序名、零件数等 |
| 产量分段追踪 | 检测NC程序名切换、零件数手动清零自动结账/开段 |
| 日终汇总 | 按自然天汇总产量到机床+程序级别、工人级别 |
| 原始数据存档 | 存原始JSON到日志库支持事后重解析 |
| 心跳上报 | 定时写心跳记录,供管理后台判断服务是否在线 |
| 告警生成 | 采集失败、未知设备、字段缺失等异常生成告警记录 |
### 1.2 技术栈
| 项 | 选型 | 说明 |
|----|------|------|
| 框架 | .NET Framework 4.7.2 Windows Service | 与Web API同框架 |
| ORM | Dapper | 复用现有CncRepository层 |
| HTTP客户端 | System.Net.Http.HttpClient | 拉取采集地址JSON |
| JSON解析 | Newtonsoft.Json 12.0.3 | 与Web API版本一致 |
| 日志 | log4net | 文件日志 + DB日志双写 |
| 数据库 | MariaDB | 业务库 + 日志库双库 |
| 管理API | HttpListener轻量自托管 | 5800端口独立于IIS |
### 1.3 与现有系统的关系
```
┌─────────────────────────────────────────────────┐
│ Windows Server │
│ │
│ ┌──────────────┐ ┌───────────────────────┐ │
│ │ IIS │ │ CncCollector │ │
│ │ CncWebApi │ │ (Windows Service) │ │
│ │ :80 │ │ 管理API :5800 │ │
│ │ 管理后台/大屏 │ │ 采集循环+Ping+汇总 │ │
│ └──────┬───────┘ └───────────┬───────────┘ │
│ │ │ │
│ └──────────┬─────────────┘ │
│ ▼ │
│ ┌────────────────────┐ │
│ │ MariaDB │ │
│ │ cnc_business │ │
│ │ cnc_log │ │
│ └────────────────────┘ │
└─────────────────────────────────────────────────┘
```
- CncCollector 复用现有的 `CncModels`、`CncRepository` 层(直接引用项目)
- 不复用 `CncService`采集服务有独立的业务逻辑不依赖Web API的Service
- 通过数据库共享数据不直接调用Web API
---
## 二、项目结构
### 2.1 新建项目
在解决方案中新增 `CncCollector` 项目:
```
src/CncCollector/
├── CncCollector.csproj ← .NET Framework 4.7.2 类库
├── ServiceEntry.cs ← Windows Service 入口ServiceBase子类
├── ServiceInstaller.cs ← 安装器Installer子类installutil用
├── Program.cs ← 主入口Main
├── Engine/
│ ├── CollectorEngine.cs ← 引擎主控(启停所有任务、生命周期管理)
│ ├── CollectorTask.cs ← 单个采集地址的采集循环
│ └── PingTask.cs ← 单个采集地址的Ping循环
├── Parser/
│ ├── FieldMapper.cs ← 字段映射解析器纯函数输入JSON+映射规则→标准字段字典)
│ └── ValueConverter.cs ← 数值转换(去.00000尾缀string/number类型转换
├── Tracker/
│ └── ProductionTracker.cs ← 产量分段追踪(检测程序切换/手动清零→结账/开段)
├── Jobs/
│ └── DailySummaryJob.cs ← 日终汇总任务(关闭活跃段→合并产量→工人汇总→日状态)
├── Config/
│ ├── ConfigProvider.cs ← 配置热更新DB轮询+HTTP通知双保险
│ └── CollectorConfig.cs ← 配置快照数据结构
├── Heartbeat/
│ └── HeartbeatReporter.cs ← 心跳上报定时写log_collector_heartbeat
├── Api/
│ ├── ManagementServer.cs ← 轻量HTTP管理APIHttpListener自托管
│ └── ManagementController.cs ← API端点实现状态/重载/启停/汇总)
├── Logging/
│ └── DualLogger.cs ← 双写日志文件log4net + DB log_system
└── App.config ← 连接串、端口、API Key、log4net配置
```
### 2.2 项目引用
```
CncCollector → CncRepository → CncModels
(不引用 CncService、CncWebApi
```
CncCollector 直接使用 Repository 层操作数据库,不经过 Service 层。原因:
- 采集服务有大量独立的业务逻辑字段映射、产量分段、汇总计算与Web API的CRUD Service职责不同
- 避免与Web API产生耦合Service层可能被修改影响采集服务
---
## 三、核心流程设计
### 3.1 采集地址的数据模型
```
采集地址cnc_collect_address
├── URLHTTP端点地址如 http://192.168.1.101/
├── 品牌:决定字段映射规则
├── 采集间隔:本地址独立配置
├── 关联机床:该地址下要采的机床列表
└── 启停状态is_enabled
一次HTTP GET 返回:
[ ← JSON数组
{ device:"fanake_1.8", desc:"西-1.8", tags:[{id:"Tag5",value:"1566.NC"},...] },
{ device:"fanake_1.9", desc:"西-1.9", tags:[{id:"Tag5",value:"O1"},...] },
...
]
device值 匹配 cnc_machine.device_code → 定位具体机床
tags数组 通过品牌字段映射规则 → 提取标准字段
```
### 3.2 双循环模型Ping + 采集)
每个启用的采集地址启动两个独立的 Task
#### 3.2.1 Ping循环
```
PingTask(address):
while(服务运行中 && address启用):
try:
success = Ping(address.Url) // ICMP Ping采集地址
catch:
success = false
更新该地址下所有机床:
is_online = success ? 1 : 0
last_ping_time = NOW()
sleep(ping_interval) // 全局配置默认60秒
```
**关键点**
- Ping结果仅用于管理后台/大屏显示机床在线状态
- Ping不通**不影响**采集逻辑——采集循环照常HTTP拉取
- 机床表的 `ip_address` 字段不参与Ping逻辑仅信息记录
- Ping是针对采集地址URLHTTP端点不是单台机床
#### 3.2.2 采集循环
```
CollectTask(address):
while(服务运行中 && address启用):
roundStart = NOW()
// 检查重载/停止请求(本轮开始前)
if(停止请求): 安全退出
if(重载请求): 执行重载后继续
try:
// 1. HTTP拉取
responseJson, duration = HTTP GET address.Url
// 2. 原始JSON存档日志库先于解析防止崩溃丢数据
写入 log_collect_raw(is_success=1, raw_json=responseJson)
// 3. 按device逐台解析
devices = 解析JSON数组
for each device in devices:
处理单个设备(device, address)
// 4. 更新采集地址状态
address.last_collect_time = NOW()
address.last_collect_status = "success"
address.fail_count = 0
catch(异常 ex):
处理采集失败(address, ex)
// 本轮完成,检查挂起的停止/重载请求
if(停止请求): 安全退出
if(重载请求): 执行重载
// 计算本轮耗时sleep剩余间隔
elapsed = NOW() - roundStart
sleep(max(0, collect_interval - elapsed))
```
**关键点**
- 原始JSON先写入日志库再解析防止解析过程中服务崩溃导致数据丢失
- 每轮采集结束才检查停止/重载请求,保证不会中断中间状态
- 采集间隔扣除本轮耗时,保证频率稳定
### 3.3 处理单个设备
```
处理单个设备(deviceObj, address):
// 1. 匹配机床
deviceCode = deviceObj[brand.deviceField] // 默认取 device 字段
machine = 按 device_code 查机床表
if(machine == null):
// 未知设备 → 告警带完整JSON
创建告警(alert_type="unknown_device", detail=deviceObj完整JSON)
return
if(machine.is_enabled == 0):
return // 已停用的机床跳过
// 2. 字段映射解析
fields = FieldMapper.Map(deviceObj, address.字段映射规则)
// fields = { program_name:"1566.NC", part_count:1219, device_status:1, ... }
// 3. 处理缺失字段
missingRequired = 检查必填字段缺失情况
if(missingRequired.Count > 0):
创建告警(alert_type="collect_fail", machine_id=machine.id,
detail="缺失字段: " + string.Join(", ", missingRequired))
// 缺失字段写null不跳过整条记录
// 4. 写结构化记录(业务库 cnc_collect_record
写入 CollectRecord(machine_id, collect_time=NOW(), fields)
// 5. 更新机床实时状态
更新 cnc_machine:
last_collect_time = NOW()
last_program_name = fields.program_name
last_part_count = fields.part_count
last_device_status = fields.device_status
last_run_status = fields.run_status
last_operate_mode = fields.operate_mode
last_machining_status = fields.machining_status
// 6. 产量分段追踪
ProductionTracker.Track(machine.id, fields.program_name, fields.part_count)
```
### 3.4 字段映射解析器
#### 输入
```
deviceObj = { "device": "fanake_1.8", "desc": "西-1.8", "tags": [...] }
mappings = [
{ standard_field: "program_name", field_name: "Tag5", match_by: "id", data_type: "string" },
{ standard_field: "part_count", field_name: "Tag8", match_by: "id", data_type: "number" },
{ standard_field: "device_status",field_name: "_io_status", match_by: "id", data_type: "number" },
...
]
```
#### 解析逻辑
```
FieldMapper.Map(deviceObj, mappings) → Dictionary<string, object>
对每条映射规则:
1. 在 deviceObj.tags 数组中查找匹配:
match_by == "id" → 找 tag.id == field_name
match_by == "desc" → 找 tag.desc == field_name
2. 找到则提取 tag.value:
data_type == "number" → 去除".00000"尾缀转decimal
data_type == "string" → 直接取字符串
3. 未找到:
结果为 null
if(is_required == 1) → 加入缺失列表
```
#### 数值转换规则
```
ValueConverter.Convert(rawValue, dataType):
if(dataType == "number"):
// "1219.00000" → 1219
// "0.00000" → 0
// "1566.NC" → null非数字不抛异常
str = rawValue.Trim()
if(decimal.TryParse(str, out result)):
return result
else:
return null
if(dataType == "string"):
return rawValue
```
### 3.5 产量分段追踪
#### 核心状态机
```
每台机床的产量追踪状态 = cnc_production_segment 表中的一条活跃记录is_settled=0, end_time=NULL
触发结账的事件:
1. NC程序名变化 → close_reason = "program_change"
2. 同程序下part_count下降手动清零 → close_reason = "manual_reset"
3. 日终汇总时仍在运行 → close_reason = "end_of_day"
4. 服务停止/崩溃恢复 → close_reason = "service_stop" / "service_crash"
```
#### Track逻辑伪代码
```
ProductionTracker.Track(machineId, currentProgramName, currentPartCount):
activeSegment = repository.GetActiveSegment(machineId)
if(activeSegment == null):
// 无活跃段 → 开新段
CreateSegment(machineId, currentProgramName, currentPartCount)
return
if(activeSegment.program_name != currentProgramName):
// NC程序切换 → 结账旧段,开新段
quantity = activeSegment.start_part_count 不变上一段的part_count快照
// 实际产量 = currentPartCount - activeSegment.start_part_count
// 但注意旧的activeSegment.start_part_count是旧程序开始时的值
CloseSegment(activeSegment.id,
endPartCount = currentPartCount,
quantity = currentPartCount - activeSegment.start_part_count,
closeReason = "program_change")
CreateSegment(machineId, currentProgramName, currentPartCount)
return
if(currentPartCount < activeSegment.start_part_count):
// 同程序下part_count下降 → 手动清零
// 结账旧段(产量 = 清零前的值 - 段起始值)
CloseSegment(activeSegment.id,
endPartCount = 上一次采集的part_count需要内存缓存,
quantity = 上一次采集的part_count - activeSegment.start_part_count,
closeReason = "manual_reset")
// 开新段,从当前值(清零后的值)开始
CreateSegment(machineId, currentProgramName, currentPartCount)
return
// 正常运行 → 无需操作,活跃段继续
// 可选:更新内存中"上一次采集的part_count"缓存
```
#### part_count缓存
为了检测手动清零,需要知道**上一次采集的part_count**。在内存中维护:
```
Dictionary<int, decimal?> _lastPartCountByMachine // machineId → 上次part_count
每次采集后更新:
_lastPartCountByMachine[machineId] = currentPartCount
手动清零判断条件:
currentPartCount < activeSegment.start_part_count
(零件数比段起始值还小,说明被手动清零了)
```
#### A-B-C-A-B场景
```
时间线:
10:00 program=A, part=0 → 开段(A, start=0)
10:30 program=A, part=10 → 正常
11:00 program=B, part=0 → 结段(A, qty=10), 开段(B, start=0)
11:30 program=C, part=0 → 结段(B, qty=0), 开段(C, start=0)
12:00 program=A, part=0 → 结段(C, qty=0), 开段(A, start=0) ← 新段,不合并
12:30 program=A, part=5 → 正常
日汇总时:
按 (machine_id, production_date, program_name) 合并
program=A 的总产量 = 10 + 5 = 15
program=B 的总产量 = 0
program=C 的总产量 = 0
```
---
## 四、采集失败与重试
### 4.1 重试机制
```
一轮采集循环中的重试:
第1次采集 → 失败
sleep(retry_interval) // 默认30秒后台可配
第2次采集 → 失败
sleep(retry_interval)
第3次采集 → 失败(达到 retry_count 上限默认3次后台可配
重试总耗时 = retry_count × retry_interval
if(重试总耗时 < collect_interval):
// 正常情况:重试耗时不影响下一轮
fail_count++
sleep(collect_interval - 重试总耗时) // 等待剩余间隔
else:
// 异常情况:重试耗时超过了采集间隔
// 重置本轮错误状态(不算一次有效失败)
// 直接进入下一轮采集
不增加 fail_count
立即开始下一轮
if(fail_count >= collect_fail_alert_threshold): // 默认5次
创建告警(alert_type="collect_fail")
fail_count = 0 // 重置,避免重复告警
```
### 4.2 配置约束
| 配置项 | 默认值 | 说明 |
|--------|--------|------|
| `collect_retry_count` | 3 | 每轮最大重试次数 |
| `collect_retry_interval` | 30 | 重试间隔(秒) |
| `collect_fail_alert_threshold` | 5 | 连续失败N轮触发告警 |
**约束**:正常配置下 `retry_count × retry_interval < collect_interval`。如果不满足,重试超时不算失败。
### 4.3 fail_count 连续性
- 采集成功一次 → `fail_count` 立即清零
- 只有**连续**失败才累积
- 达到阈值触发告警后清零,避免同一问题重复告警
---
## 五、配置热更新
### 5.1 双保险机制
```
管理后台修改 → 写DB → HTTP通知采集服务 → 采集服务验证生效
┌─── 即时通知HTTP POST /api/reload───┐
│ │
管理后台写DB ──┤ ├──→ 采集服务重载配置
│ │
└─── 兜底轮询每30秒查DB─────────────────┘
```
### 5.2 重载流程
```
1. 管理后台修改采集地址/品牌模板 → 写入数据库
2. 管理后台调用采集服务API:
POST http://localhost:5800/api/reload
Headers: X-Api-Key: {collector_api_key}
3. 采集服务收到重载请求:
a. 标记"重载请求挂起",不立即中断
b. 等待所有地址当前轮采集完成
c. 从DB重新加载配置:
- cnc_collect_address启用的地址列表
- cnc_brand_field_mapping字段映射规则
- cnc_sys_config系统配置
d. 对比新旧配置,生成变更摘要
e. 调整运行中的任务:
- 新增的地址 → 启动新的Ping+采集Task
- 停用的地址 → 停止对应Task
- 变更的地址 → 停止旧Task启动新Task
f. 返回变更结果
4. 管理后台验证:
GET http://localhost:5800/api/config/hash
对比本地DB配置的hash → 一致则确认生效
```
### 5.3 配置快照
内存中维护的配置数据结构:
```csharp
class CollectorConfig
{
string ConfigHash; // MD5指纹快速判断是否变更
List<CollectAddress> Addresses; // 启用的采集地址列表
Dictionary<int, List<BrandFieldMapping>> BrandMappings; // brandId → 字段映射
Dictionary<string, string> SysConfig; // 系统配置键值对
DateTime LoadedAt; // 加载时间
}
```
### 5.4 管理后台交互
管理后台修改采集地址/品牌模板后:
- 保存按钮旁显示"已通知采集服务重载 ✓"或"通知失败将在30秒后自动同步 ⚠"
- 通过Web API后端转发HTTP请求到采集服务前端不直连5800端口
---
## 六、日终汇总
### 6.1 触发机制(启动时补检 + 内置定时器)
```
DailySummaryJob:
// 启动时自检
OnStart():
yesterday = DateTime.Today.AddDays(-1)
if(昨天尚未汇总):
立即执行昨天的汇总
// 定时检查(每分钟)
Timer(60秒):
now = DateTime.Now
targetDate = DateTime.Today.AddDays(-1) // 汇总昨天
if(now >= 配置的daily_summary_time): // 默认01:00
if(今天尚未执行过汇总):
执行汇总(targetDate)
记录"今天已汇总"
// HTTP API 手动触发
OnManualTrigger(date):
执行汇总(date) // 幂等:先删旧数据再重算
```
**优势**
- 服务01:00挂了02:00恢复后自动补上
- 手动触发支持重算任意日期
- 幂等设计:同一天汇总多次不会重复
### 6.2 汇总步骤
```
DailySummaryJob.Execute(targetDate):
for each 启用的机床:
// Step 1: 关闭该机床的所有活跃段
activeSegments = 查询(machine_id, is_settled=0, end_time=NULL)
for each segment in activeSegments:
CloseSegment(segment.id,
endPartCount = lastPartCount缓存或段内最大part_count,
quantity = endPartCount - segment.start_part_count,
closeReason = "end_of_day")
// Step 2: 标记结算
SettleByDate(targetDate)
// Step 3: 按程序合并 → cnc_daily_production
segments = 查询该机床targetDate的所有段
grouped = segments.GroupBy(s => s.program_name)
for each group:
UpsertDailyProduction(
machine_id, targetDate, program_name,
total_quantity = Sum(group.quantity),
segment_count = group.Count()
)
// Step 4: 写机床日状态
UpsertMachineDailyStatus(
machine_id, targetDate,
data_status = 根据当天采集情况判断:
"normal" → 至少成功采集过一次
"offline" → 当天从未Ping通
"data_missing" → Ping通但采集始终失败
)
// Step 5: 工人汇总 → cnc_worker_daily_summary
for each worker:
machines = 该工人绑定的机床列表
workerProductions = 查这些机床在targetDate的daily_production
UpsertWorkerDailySummary(
worker_id, targetDate,
total_quantity = Sum(workerProductions.total_quantity),
machine_count = 去重机床数,
program_count = 去重程序名数
)
// Step 6: 未开机机床插0产量记录
allEnabledMachines = 所有启用的机床
machinesWithData = 当天有daily_production记录的机床
for each machine in (allEnabledMachines - machinesWithData):
InsertDailyProduction(machine.id, targetDate, quantity=0)
UpsertMachineDailyStatus(machine.id, targetDate, "offline")
```
---
## 七、心跳上报
### 7.1 心跳机制
```
HeartbeatReporter:
每 heartbeat_interval 秒默认10秒可配:
写入 log_collector_heartbeat:
service_id = "CncCollector"
status = "running"
collect_address_id = null全局心跳
last_collect_time = 最近一次成功采集时间
success_count = 本轮成功采集次数
fail_count = 本轮失败采集次数
uptime_seconds = (NOW() - 服务启动时间).TotalSeconds
detail = JSON.stringify({
activeTasks: 活跃采集任务数,
activePings: 活跃Ping任务数,
configHash: 当前配置指纹
})
```
### 7.2 Web API判断服务状态
```sql
-- 管理后台/大屏查询采集服务是否在线
SELECT * FROM log_collector_heartbeat
WHERE service_id = 'CncCollector'
AND created_at > NOW() - INTERVAL 30 SECOND
ORDER BY created_at DESC LIMIT 1
-- 有记录 → 运行中;无记录 → 已停止
```
---
## 八、日志系统
### 8.1 双写策略
| 事件类型 | 文件日志 | DB日志(log_system) |
|----------|---------|-------------------|
| 每次采集成功 | ✅ INFO | ❌ 不写(量太大) |
| 采集失败/重试 | ✅ ERROR | ✅ ERROR |
| 字段缺失告警 | ✅ WARN | ❌写入cnc_alert |
| 未知设备告警 | ✅ WARN | ❌写入cnc_alert |
| 配置变更/重载 | ✅ INFO | ✅ INFO |
| 服务启停 | ✅ INFO | ✅ INFO |
| 日终汇总 | ✅ INFO | ✅ INFO |
| 异常崩溃 | ✅ FATAL | ✅ ERROR如果还能写的话 |
| Ping状态变化 | ✅ DEBUG | ❌ |
### 8.2 文件日志格式
```
logs/collector-2026-04-30.log
[2026-04-30 01:00:02 INFO ] 日终汇总开始目标日期2026-04-29
[2026-04-30 01:00:05 INFO ] 日终汇总完成处理160台机床耗时3.2秒
[2026-04-30 01:00:30 INFO ] 地址[FANUC-1号] 采集成功获取24台设备数据耗时120ms
[2026-04-30 01:01:00 WARN ] 机床[西-1.8](id=5) 必填字段缺失: spindle_speed_set
[2026-04-30 01:02:00 ERROR] 地址[FANUC-2号] 采集失败(第3次重试): 连接超时
[2026-04-30 01:02:05 INFO ] 配置重载完成新增地址1个停用地址1个更新映射3条
[2026-04-30 01:02:30 WARN ] 未知设备 device=fanake_1.15,采集地址[FANUC-1号]
```
### 8.3 log4net配置要点
```xml
<!-- App.config 中 log4net 配置 -->
<log4net>
<appender name="RollingFile" type="log4net.Appender.RollingFileAppender">
<file value="logs/collector" />
<datePattern value="-yyyy-MM-dd'.log'" />
<maximumFileSize value="50MB" />
<maxSizeRollBackups value="10" />
<layout type="log4net.Layout.PatternLayout">
<conversionPattern value="[%date %-5level] %message%newline" />
</layout>
</appender>
<root>
<level value="INFO" />
<appender-ref ref="RollingFile" />
</root>
</log4net>
```
---
## 九、优雅停止与异常恢复
### 9.1 优雅停止
```
核心原则:收到停止请求后,等待所有地址当前轮采集完成再退出
CollectorEngine.Stop():
1. 标记 _stopRequested = true
2. 日志记录"收到停止请求,等待当前轮采集完成"
3. 等待所有CollectorTask和PingTask完成带超时
Task.WaitAll(allTasks, timeout: 30秒)
4. 关闭所有活跃的产量分段:
for each 活跃段:
CloseSegment(reason="service_stop")
5. 写最终心跳(status="stopped")
6. 日志记录"服务已停止"
7. 释放资源
```
### 9.2 配置重载时的保护
```
CollectorEngine.Reload():
1. 标记 _reloadRequested = true
2. 日志记录"收到重载请求,等待当前轮采集完成"
3. 不中断任何正在执行的采集任务
4. 等待所有任务到达本轮结束点(循环顶部检查标记)
5. 从DB重新加载配置
6. 对比新旧配置,生成变更摘要
7. 停掉不再需要的任务
8. 启动新任务
9. 清除 _reloadRequested 标记
10. 返回变更摘要
```
### 9.3 异常停止(崩溃/强杀)的数据防护
#### 产量分段——天然事务边界
```
采集服务对产量分段的操作只有两种原子动作:
1. 开新段INSERT → end_time=NULL
2. 结旧段UPDATE → 设置end_time
最坏情况:结旧段成功但开新段失败
后果:机床没有活跃段
恢复:下次服务启动时自检恢复(见下)
数据丢失:最多丢失一次采集的产量,不会出现脏数据
```
#### 服务启动时自检恢复
```
CollectorEngine.OnStart():
1. 加载配置
2. 自检恢复:
activeSegments = 查询所有 is_settled=0 AND end_time=NULL 的段
for each segment:
if(segment.start_time 距今 > 2小时):
// 很可能是崩溃遗留的孤儿段
CloseSegment(segment.id,
endPartCount = segment.start_part_count,
quantity = 0,
closeReason = "service_crash",
endTime = segment.start_time.AddMinutes(5))
3. 启动Ping+采集任务
4. 自检日终汇总
```
#### 原始JSON先写后解析
```
采集流程中的写入顺序:
第1步写入 log_collect_raw原始JSON + is_success=1 ← 先保存
第2步逐台解析 + 写 cnc_collect_record ← 后处理
如果第2步中途崩溃:
- 原始JSON已安全保存
- 可通过管理API手动触发"重解析"按log_collect_raw的记录重新跑字段映射
```
#### 采集记录——天然原子性
```
每台设备的结构化数据是一条独立的 INSERT
即使中途崩溃,已写入的记录是完整的
未写入的记录只是缺失,下次采集会补上
```
---
## 十、管理API
### 10.1 端点列表
| 方法 | URL | 说明 | 认证 |
|------|-----|------|------|
| GET | /api/status | 服务状态(运行时长、任务数、各地址状态) | X-Api-Key |
| POST | /api/reload | 立即重载配置(等本轮完成后执行) | X-Api-Key |
| POST | /api/stop | 停止所有采集任务 | X-Api-Key |
| POST | /api/start | 启动所有采集任务 | X-Api-Key |
| GET | /api/tasks | 当前运行的任务列表 | X-Api-Key |
| GET | /api/config/snapshot | 当前生效的配置快照 | X-Api-Key |
| GET | /api/config/hash | 配置指纹MD5 | X-Api-Key |
| POST | /api/daily-summary | 手动触发日终汇总,参数: date=2026-04-29 | X-Api-Key |
| POST | /api/reparse | 重解析指定原始记录,参数: rawId=12345 | X-Api-Key |
### 10.2 认证
所有管理API请求需携带 Header `X-Api-Key`,值与 `cnc_sys_config` 表中的 `collector_api_key` 一致。
### 10.3 实现方式
使用 `System.Net.HttpListener` 自托管独立于IIS
```csharp
// ManagementServer.cs
HttpListener listener = new HttpListener();
listener.Prefixes.Add("http://+:{port}/api/");
listener.Start();
// 请求处理循环
while(running)
{
var context = listener.GetContext();
// 路由到 ManagementController 对应方法
}
```
### 10.4 响应格式
```json
{
"code": 0,
"message": "success",
"data": { ... }
}
```
与Web API统一格式。
---
## 十一、告警类型汇总
| alert_type | 触发条件 | detail内容 |
|------------|---------|------------|
| `collect_fail` | 连续N轮采集失败 | 失败原因摘要 |
| `collect_fail` | 必填字段缺失 | 缺失字段列表 |
| `unknown_device` | device匹配不到机床表 | 完整device JSON对象 |
| `device_offline` | 预留当前Ping状态直接更新机床表 | - |
| `service_error` | 采集服务内部异常 | 异常消息+堆栈 |
### 告警频率控制
- 同一台机床同一个问题5分钟内不重复告警
- 未知设备告警同一device编码1小时内不重复
- 告警表不记录采集成功的正常事件
---
## 十二、App.config 配置项
```xml
<configuration>
<connectionStrings>
<add name="BusinessConnection"
connectionString="Server=localhost;Database=cnc_business;Uid=root;Pwd=root;Charset=utf8mb4;SslMode=None;" />
<add name="LogConnection"
connectionString="Server=localhost;Database=cnc_log;Uid=root;Pwd=root;Charset=utf8mb4;SslMode=None;" />
</connectionStrings>
<appSettings>
<!-- 服务标识 -->
<add key="ServiceId" value="CncCollector" />
<!-- 管理API -->
<add key="ManagementApiPort" value="5800" />
<add key="CollectorApiKey" value="与cnc_sys_config.collector_api_key一致" />
<!-- 采集参数首次启动默认值后续从DB读取 -->
<add key="PingInterval" value="60" />
<add key="CollectRetryCount" value="3" />
<add key="CollectRetryInterval" value="30" />
<add key="CollectFailAlertThreshold" value="5" />
<add key="HeartbeatInterval" value="10" />
<add key="ConfigPollInterval" value="30" />
<!-- 日终汇总 -->
<add key="DailySummaryTime" value="01:00" />
<!-- 日志 -->
<add key="LogLevel" value="INFO" />
</appSettings>
</configuration>
```
---
## 十三、数据库交互清单
### 13.1 采集服务读取的表
| 表 | 库 | 用途 | 频率 |
|----|-----|------|------|
| cnc_collect_address | 业务 | 加载启用的采集地址列表 | 每30秒/重载时 |
| cnc_brand_field_mapping | 业务 | 加载品牌字段映射规则 | 每30秒/重载时 |
| cnc_brand | 业务 | 获取品牌device_field/tags_path | 每30秒/重载时 |
| cnc_machine | 业务 | 按device_code匹配机床 | 每次采集 |
| cnc_production_segment | 业务 | 查活跃段 | 每次采集每台机床 |
| cnc_worker_machine | 业务 | 查机床绑定的工人(日汇总用) | 日汇总时 |
| cnc_sys_config | 业务 | 读取系统配置 | 每30秒/重载时 |
| log_collect_raw | 日志 | 读取原始记录(重解析用) | 按需 |
### 13.2 采集服务写入的表
| 表 | 库 | 用途 | 频率 |
|----|-----|------|------|
| log_collect_raw | 日志 | 存原始JSON | 每次采集 |
| log_collector_heartbeat | 日志 | 心跳 | 每10秒 |
| log_system | 日志 | 关键事件日志 | 关键事件时 |
| cnc_collect_record | 业务 | 结构化采集记录 | 每次采集每台机床 |
| cnc_production_segment | 业务 | 开段/结段 | 程序切换/清零时 |
| cnc_daily_production | 业务 | 日汇总产量 | 日汇总时 |
| cnc_worker_daily_summary | 业务 | 工人日汇总 | 日汇总时 |
| cnc_machine_daily_status | 业务 | 机床日状态 | 日汇总时 |
| cnc_machine | 业务 | 更新实时状态字段 | 每次采集每台机床 |
| cnc_collect_address | 业务 | 更新last_collect_time/fail_count | 每次采集 |
| cnc_alert | 业务 | 告警记录 | 异常时 |
### 13.3 需要新增的Repository方法
以下方法在现有接口中不存在,需要新增:
```csharp
// ICollectRecordRepository新建
long Create(CollectRecord entity);
// IMachineRepository补充
void UpdateRealtimeStatus(int machineId, Machine realtimeFields);
// IDailyProductionRepository补充
int Upsert(DailyProduction entity); // INSERT ON DUPLICATE KEY UPDATE
void DeleteByMachineAndDate(int machineId, DateTime date);
// IWorkerDailySummaryRepository补充
int Upsert(WorkerDailySummary entity);
```
---
## 十四、部署与运维
### 14.1 安装为Windows Service
```bash
# 安装
installutil CncCollector.exe
# 卸载
installutil /u CncCollector.exe
# 或使用 sc 命令
sc create CncCollector binPath= "E:\opencode\haoliang\src\CncCollector\bin\Debug\CncCollector.exe"
sc start CncCollector
sc stop CncCollector
```
### 14.2 文件部署位置
```
E:\opencode\haoliang\src\CncCollector\bin\Debug\
├── CncCollector.exe
├── CncCollector.exe.config ← App.config 编译输出
├── CncModels.dll ← 引用的模型层
├── CncRepository.dll ← 引用的仓储层
├── Dapper.dll
├── MySqlConnector.dll
├── Newtonsoft.Json.dll
├── log4net.dll
└── logs\ ← 日志目录(自动创建)
```
### 14.3 与Web API共享数据库
- 采集服务和Web API同时读写同一个MariaDB
- 采集服务只写自己的表采集记录、产量段、日汇总等不会覆盖Web API管理的配置数据
- 配置变更通过DB传递双保险保证一致
### 14.4 防火墙
- 开放5800端口供管理API通信仅本机/局域网访问)
- 采集服务需要出站HTTP访问采集地址URL
- Ping需要ICMP出站权限
---
## 十五、开发顺序建议
| 步骤 | 内容 | 依赖 |
|------|------|------|
| 1 | 新建CncCollector项目 + 引用CncModels/CncRepository + App.config | 无 |
| 2 | FieldMapper + ValueConverter纯函数可独立测试 | 步骤1 |
| 3 | CollectorTask 单地址采集循环HTTP拉取+解析+写库) | 步骤2 |
| 4 | ProductionTracker 产量分段追踪 | 步骤1 |
| 5 | PingTask Ping循环 | 步骤1 |
| 6 | CollectorEngine 引擎启停所有Task | 步骤3+4+5 |
| 7 | HeartbeatReporter 心跳 | 步骤1 |
| 8 | ConfigProvider 配置热更新 | 步骤6 |
| 9 | DailySummaryJob 日终汇总 | 步骤4 |
| 10 | ManagementServer + ManagementController 管理API | 步骤6+8+9 |
| 11 | DualLogger 双写日志 | 步骤1 |
| 12 | ServiceEntry + ServiceInstaller Windows Service包装 | 步骤6+10 |
| 13 | 集成测试(完整链路) | 全部 |
建议先跑通 **步骤1→2→3** 这条最小链路(一个地址→拉取→解析→写库),再逐步叠加其他功能。