24 KiB
24 KiB
集成中心领域技术方案
领域编号: 4.6
微服务: ether-hub
最后更新: 2026-02-10
一、领域概述
1.1 领域职责
集成中心领域负责 Ether 平台与外部系统的对接集成:
- 外部系统适配(停车系统、门禁系统、政府监管平台)
- IoT设备接入与管理
- 数据同步与转换
- 集成日志记录与监控
1.2 核心概念
| 概念 | 说明 | 对应实体 |
|---|---|---|
| 集成适配器 | 外部系统连接配置 | IntegrationAdapter |
| 集成日志 | 接口调用记录 | IntegrationLog |
| IoT设备 | 物联网设备管理 | IoTDevice |
| 数据映射 | 字段转换规则 | DataMapping |
二、领域模型
2.1 聚合根设计
IntegrationAdapter(集成适配器)
@Entity
@Table(name = "hub_integration_adapter")
@Data
public class IntegrationAdapter {
@Id
private UUID id;
private UUID projectId;
private String name; // 适配器名称
private String systemType; // PARKING/ACCESS/GOVERNMENT/IOT
private String description; // 描述
// 协议配置
private String protocol; // HTTP/HTTPS/MQTT/COAP/SOAP
private String baseUrl; // 基础URL
private Integer timeout; // 超时时间(秒)
private Integer retryCount; // 重试次数
// 认证配置
private String authType; // NONE/API_KEY/OAUTH2/JWT/BASIC
private String authConfig; // 认证配置(JSON)
// API_KEY: {key, value, location: header/query}
// OAUTH2: {clientId, clientSecret, tokenUrl, scope}
// JWT: {secret, algorithm}
// BASIC: {username, password}
// 状态
private AdapterStatus status; // ONLINE/OFFLINE/ERROR
private LocalDateTime lastPingTime;
private String lastErrorMessage;
// 启用状态
private Boolean enabled;
// 审计字段
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}
// 适配器状态枚举
public enum AdapterStatus {
ONLINE("在线"),
OFFLINE("离线"),
ERROR("错误");
}
IntegrationLog(集成日志)
@Entity
@Table(name = "hub_integration_log")
@Data
public class IntegrationLog {
@Id
private UUID id;
private UUID adapterId;
// 请求信息
private String requestId; // 请求唯一标识
private String operation; // 操作类型
private String requestMethod; // GET/POST/PUT/DELETE
private String requestUrl; // 请求URL
private String requestHeaders; // 请求头(JSON)
private String requestBody; // 请求体
private LocalDateTime requestTime;
// 响应信息
private String responseBody; // 响应体
private Integer httpStatus; // HTTP状态码
private String responseHeaders;// 响应头(JSON)
private LocalDateTime responseTime;
// 执行信息
private Long durationMs; // 耗时(ms)
// 状态
private LogStatus status; // SUCCESS/FAIL/TIMEOUT
private String errorMessage; // 错误信息
private String errorStack; // 错误堆栈
// 审计字段
private LocalDateTime createdAt;
}
// 日志状态枚举
public enum LogStatus {
SUCCESS("成功"),
FAIL("失败"),
TIMEOUT("超时");
}
IoTDevice(IoT设备)
@Entity
@Table(name = "hub_iot_device")
@Data
public class IoTDevice {
@Id
private UUID id;
private UUID projectId;
// 设备信息
private String deviceId; // 设备唯一标识
private String deviceName; // 设备名称
private String deviceType; // 设备类型: SENSOR/CAMERA/GATE/...
private String protocol; // MQTT/CoAP/HTTP/HTTPS
// 关联信息
private UUID equipmentId; // 关联设备台账
private UUID spaceNodeId; // 关联空间节点
// MQTT配置
private String mqttBroker; // MQTT服务器地址
private Integer mqttPort; // MQTT端口
private String mqttTopic; // 订阅主题
private String mqttClientId; // 客户端ID
// 最后数据
private String lastPayload; // 最后上报数据
private LocalDateTime lastReportTime;
// 状态
private DeviceStatus status; // ONLINE/OFFLINE/FAULT
private LocalDateTime lastOnlineTime;
// 审计字段
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}
// 设备状态枚举
public enum DeviceStatus {
ONLINE("在线"),
OFFLINE("离线"),
FAULT("故障");
}
DataMapping(数据映射)
@Entity
@Table(name = "hub_data_mapping")
@Data
public class DataMapping {
@Id
private UUID id;
private UUID projectId;
private UUID adapterId; // 关联适配器
private String name; // 映射名称
private String entityType; // 实体类型: WORK_ORDER/EQUIPMENT/...
// 字段映射
private String fieldMappings; // [{"source": "orderNo", "target": "工单编号", "type": "STRING"}]
// 转换规则
private String transformRules; // 数据转换规则(JSON)
// 启用状态
private Boolean enabled;
// 审计字段
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}
三、集成场景
3.1 停车系统集成
@Service
public class ParkingIntegrationService {
@Autowired
private IntegrationAdapterRepository adapterRepository;
@Autowired
private IntegrationLogRepository logRepository;
// 同步车位状态
public void syncParkingSpaceStatus(UUID projectId) {
IntegrationAdapter adapter = adapterRepository
.findByProjectIdAndSystemType(projectId, "PARKING")
.orElseThrow(() -> new NotFoundException("停车系统适配器未配置"));
try {
// 调用停车系统API
String response = callExternalApi(adapter, "/api/parking/spaces", "GET", null);
// 解析响应
List<ParkingSpaceDTO> spaces = parseResponse(response);
// 更新本地车位状态
for (ParkingSpaceDTO space : spaces) {
updateLocalParkingSpace(space);
}
} catch (Exception e) {
log.error("同步车位状态失败", e);
throw new IntegrationException("同步车位状态失败: " + e.getMessage());
}
}
// 车牌识别通知
public void handlePlateRecognition(PlateRecognitionEvent event) {
// 处理车牌识别结果
String plateNumber = event.getPlateNumber();
// 查询访客预约
Optional<VisitorAppointment> appointment = visitorService
.findByPlateNumber(plateNumber);
if (appointment.isPresent()) {
// 访客车辆,自动抬杆
sendOpenGateCommand(event.getGateId());
// 记录通行
recordVehicleAccess(plateNumber, event.getGateId(), "VISITOR");
} else {
// 业主车辆,查询业主信息
Optional<Owner> owner = ownerService.findByPlateNumber(plateNumber);
if (owner.isPresent()) {
sendOpenGateCommand(event.getGateId());
recordVehicleAccess(plateNumber, event.getGateId(), "OWNER");
} else {
// 陌生车辆,记录并提示
recordVehicleAccess(plateNumber, event.getGateId(), "UNKNOWN");
}
}
}
}
3.2 门禁系统集成
@Service
public class AccessIntegrationService {
// 下发访客二维码到门禁
public void syncVisitorCredentialToAccess(UUID credentialId) {
VisitorCredential credential = credentialRepository.findById(credentialId)
.orElseThrow(() -> new NotFoundException("凭证不存在"));
// 查询门禁系统适配器
IntegrationAdapter adapter = adapterRepository
.findByProjectIdAndSystemType(credential.getProjectId(), "ACCESS")
.orElseThrow(() -> new NotFoundException("门禁系统适配器未配置"));
// 构建下发数据
AccessCredentialDTO dto = new AccessCredentialDTO();
dto.setCredentialCode(credential.getCredentialCode());
dto.setQrCode(credential.getQrCode());
dto.setExpireTime(credential.getExpireTime());
dto.setAccessGates(parseAccessGates(credential.getAccessGates()));
// 调用门禁系统API
callExternalApi(adapter, "/api/access/credentials", "POST", dto);
}
// 接收门禁通行记录
@PostMapping("/webhook/access/record")
public void receiveAccessRecord(@RequestBody AccessRecordDTO record) {
// 保存通行记录
VisitorAccessRecord accessRecord = new VisitorAccessRecord();
accessRecord.setId(UUID.randomUUID());
accessRecord.setCredentialCode(record.getCredentialCode());
accessRecord.setAccessTime(record.getAccessTime());
accessRecord.setGateId(record.getGateId());
accessRecord.setAccessType(record.getAccessType()); // ENTRY/EXIT
accessRecord.setPhotoUrl(record.getPhotoUrl());
accessRecordRepository.save(accessRecord);
// 更新凭证使用记录
credentialService.updateCredentialUsage(record.getCredentialCode());
// 通知被访人
notificationService.notifyHost(accessRecord);
}
}
3.3 IoT设备接入
@Service
public class IoTDeviceService {
@Autowired
private MqttClient mqttClient;
// 设备数据监听
@EventListener
public void onDeviceMessage(DeviceMessageEvent event) {
String deviceId = event.getDeviceId();
String payload = event.getPayload();
// 查询设备
IoTDevice device = deviceRepository.findByDeviceId(deviceId)
.orElse(null);
if (device == null) {
log.warn("未知设备: {}", deviceId);
return;
}
// 更新设备状态
device.setLastPayload(payload);
device.setLastReportTime(LocalDateTime.now());
device.setStatus(DeviceStatus.ONLINE);
device.setLastOnlineTime(LocalDateTime.now());
deviceRepository.save(device);
// 解析数据
DeviceData data = parseDeviceData(device.getDeviceType(), payload);
// 根据设备类型处理
switch (device.getDeviceType()) {
case "TEMPERATURE_SENSOR":
handleTemperatureData(device, data);
break;
case "HUMIDITY_SENSOR":
handleHumidityData(device, data);
break;
case "SMOKE_DETECTOR":
handleSmokeAlarm(device, data);
break;
case "WATER_LEAK_DETECTOR":
handleWaterLeakAlarm(device, data);
break;
default:
log.info("未处理的设备类型: {}", device.getDeviceType());
}
}
// 处理烟雾报警
private void handleSmokeAlarm(IoTDevice device, DeviceData data) {
Boolean smokeDetected = data.getBoolean("smokeDetected");
if (smokeDetected) {
// 创建紧急工单
WorkOrder workOrder = new WorkOrder();
workOrder.setOrderType(WorkOrderType.EMERGENCY);
workOrder.setTitle("烟雾报警: " + device.getDeviceName());
workOrder.setDescription("设备位置: " + device.getSpaceNodeId() +
"\n报警时间: " + LocalDateTime.now());
workOrder.setPriority(WorkOrderPriority.URGENT);
workOrder.setSource(WorkOrderSource.IOT);
workOrder.setEquipmentId(device.getEquipmentId());
workOrder.setSpaceNodeId(device.getSpaceNodeId());
workOrderService.create(workOrder);
// 发送紧急通知
notificationService.sendEmergencyAlert(device, "烟雾报警");
}
}
// 下发命令到设备
public void sendCommandToDevice(String deviceId, String command, Map<String, Object> params) {
IoTDevice device = deviceRepository.findByDeviceId(deviceId)
.orElseThrow(() -> new NotFoundException("设备不存在"));
// 构建MQTT消息
MqttMessage message = new MqttMessage();
message.setTopic(device.getMqttTopic() + "/command");
Map<String, Object> payload = new HashMap<>();
payload.put("command", command);
payload.put("params", params);
payload.put("timestamp", System.currentTimeMillis());
message.setPayload(JsonUtils.toJson(payload).getBytes());
// 发送消息
mqttClient.publish(message);
}
}
四、集成适配器框架
4.1 适配器接口
// 集成适配器接口
public interface IntegrationAdapterClient {
// 发送请求
String sendRequest(String operation, String method, String path,
Map<String, String> headers, Object body);
// 健康检查
boolean healthCheck();
// 获取适配器信息
AdapterInfo getAdapterInfo();
}
// HTTP适配器实现
@Component
public class HttpIntegrationAdapter implements IntegrationAdapterClient {
@Autowired
private RestTemplate restTemplate;
private IntegrationAdapter config;
public HttpIntegrationAdapter(IntegrationAdapter config) {
this.config = config;
}
@Override
public String sendRequest(String operation, String method, String path,
Map<String, String> headers, Object body) {
String url = config.getBaseUrl() + path;
// 构建请求头
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
// 添加认证信息
addAuthenticationHeaders(httpHeaders);
// 添加自定义头
if (headers != null) {
headers.forEach(httpHeaders::add);
}
// 构建请求体
HttpEntity<Object> entity = new HttpEntity<>(body, httpHeaders);
// 发送请求
ResponseEntity<String> response = restTemplate.exchange(
url,
HttpMethod.valueOf(method),
entity,
String.class
);
return response.getBody();
}
private void addAuthenticationHeaders(HttpHeaders headers) {
String authType = config.getAuthType();
String authConfig = config.getAuthConfig();
switch (authType) {
case "API_KEY":
Map<String, String> apiKeyConfig = JsonUtils.fromJson(authConfig, Map.class);
String location = apiKeyConfig.get("location");
if ("header".equals(location)) {
headers.add(apiKeyConfig.get("key"), apiKeyConfig.get("value"));
}
break;
case "OAUTH2":
String token = getOAuth2Token(authConfig);
headers.setBearerAuth(token);
break;
case "BASIC":
Map<String, String> basicConfig = JsonUtils.fromJson(authConfig, Map.class);
String credentials = basicConfig.get("username") + ":" + basicConfig.get("password");
String encoded = Base64.getEncoder().encodeToString(credentials.getBytes());
headers.setBasicAuth(encoded);
break;
}
}
@Override
public boolean healthCheck() {
try {
sendRequest("healthCheck", "GET", "/health", null, null);
return true;
} catch (Exception e) {
return false;
}
}
}
4.2 集成日志记录
@Aspect
@Component
public class IntegrationLogAspect {
@Autowired
private IntegrationLogRepository logRepository;
@Around("execution(* com.ether.hub.client.IntegrationAdapterClient.*(..))")
public Object around(ProceedingJoinPoint point) throws Throwable {
// 获取方法参数
Object[] args = point.getArgs();
String operation = (String) args[0];
String method = (String) args[1];
String path = (String) args[2];
Map<String, String> headers = (Map<String, String>) args[3];
Object body = args[4];
// 创建日志
IntegrationLog log = new IntegrationLog();
log.setId(UUID.randomUUID());
log.setRequestId(UUID.randomUUID().toString());
log.setOperation(operation);
log.setRequestMethod(method);
log.setRequestUrl(path);
log.setRequestHeaders(JsonUtils.toJson(headers));
log.setRequestBody(JsonUtils.toJson(body));
log.setRequestTime(LocalDateTime.now());
long startTime = System.currentTimeMillis();
try {
// 执行目标方法
Object result = point.proceed();
// 记录成功
log.setResponseBody((String) result);
log.setHttpStatus(200);
log.setStatus(LogStatus.SUCCESS);
return result;
} catch (Exception e) {
// 记录失败
log.setStatus(LogStatus.FAIL);
log.setErrorMessage(e.getMessage());
throw e;
} finally {
log.setDurationMs(System.currentTimeMillis() - startTime);
log.setResponseTime(LocalDateTime.now());
log.setCreatedAt(LocalDateTime.now());
// 异步保存日志
logRepository.save(log);
}
}
}
五、API 接口
@RestController
@RequestMapping("/api/v1/hub")
@Tag(name = "集成中心")
public class IntegrationController {
// 适配器管理
@PostMapping("/adapters")
@Operation(summary = "创建适配器")
public Result<IntegrationAdapterVO> createAdapter(@RequestBody @Valid AdapterCreateRequest request);
@GetMapping("/adapters")
@Operation(summary = "查询适配器列表")
public Result<List<IntegrationAdapterVO>> listAdapters();
@GetMapping("/adapters/{id}")
@Operation(summary = "获取适配器详情")
public Result<IntegrationAdapterVO> getAdapter(@PathVariable UUID id);
@PutMapping("/adapters/{id}")
@Operation(summary = "更新适配器")
public Result<IntegrationAdapterVO> updateAdapter(@PathVariable UUID id,
@RequestBody @Valid AdapterUpdateRequest request);
@DeleteMapping("/adapters/{id}")
@Operation(summary = "删除适配器")
public Result<Void> deleteAdapter(@PathVariable UUID id);
@PostMapping("/adapters/{id}/test")
@Operation(summary = "测试连接")
public Result<ConnectionTestResult> testConnection(@PathVariable UUID id);
// 集成日志
@GetMapping("/logs")
@Operation(summary = "查询集成日志")
public Result<Page<IntegrationLogVO>> pageLogs(IntegrationLogQueryRequest request);
// IoT设备管理
@PostMapping("/iot-devices")
@Operation(summary = "注册IoT设备")
public Result<IoTDeviceVO> registerDevice(@RequestBody @Valid IoTDeviceRegisterRequest request);
@GetMapping("/iot-devices")
@Operation(summary = "查询设备列表")
public Result<List<IoTDeviceVO>> listDevices();
@PostMapping("/iot-devices/{id}/command")
@Operation(summary = "发送设备命令")
public Result<Void> sendDeviceCommand(@PathVariable UUID id,
@RequestBody DeviceCommandRequest request);
// 数据映射
@PostMapping("/data-mappings")
@Operation(summary = "创建数据映射")
public Result<DataMappingVO> createDataMapping(@RequestBody @Valid DataMappingCreateRequest request);
@PostMapping("/data-mappings/{id}/transform")
@Operation(summary = "测试数据转换")
public Result<Map<String, Object>> testTransform(@PathVariable UUID id,
@RequestBody Map<String, Object> sourceData);
}
六、实现状态与差异
6.1 实现状态
| 功能模块 | 实现状态 | 备注 |
|---|---|---|
| IntegrationAdapter | 🔴 未实现 | 待开发 |
| IntegrationLog | 🔴 未实现 | 待开发 |
| IoTDevice | 🔴 未实现 | 待开发 |
| DataMapping | 🔴 未实现 | 待开发 |
| 停车系统集成 | 🔴 未实现 | 待开发 |
| 门禁系统集成 | 🔴 未实现 | 待开发 |
| IoT接入 | 🔴 未实现 | 待开发 |
6.2 与设计方案的差异
| 设计项 | 设计方案 | 现有实现 | 差异分析 |
|---|---|---|---|
| ether-hub服务 | 完整实现 | 未创建 | 🔴 整个服务缺失 |
| 外部系统集成 | 停车/门禁/政府 | 未实现 | 🔴 功能缺失 |
| IoT接入 | MQTT/CoAP | 未实现 | 🔴 功能缺失 |
6.3 改进计划
| 优先级 | 改进项 | 说明 |
|---|---|---|
| P2 | 创建ether-hub服务 | 新建集成中心微服务 |
| P3 | 实现停车集成 | 对接停车系统API |
| P3 | 实现门禁集成 | 对接门禁系统 |
| P3 | 实现IoT接入 | MQTT Broker配置 |
七、数据库表结构
-- 集成适配器表
CREATE TABLE hub_integration_adapter (
id UUID PRIMARY KEY,
project_id UUID NOT NULL,
name VARCHAR(100) NOT NULL,
system_type VARCHAR(50) NOT NULL,
description VARCHAR(500),
protocol VARCHAR(20) NOT NULL,
base_url VARCHAR(255) NOT NULL,
timeout INTEGER DEFAULT 30,
retry_count INTEGER DEFAULT 3,
auth_type VARCHAR(20) NOT NULL,
auth_config JSONB,
status VARCHAR(20) DEFAULT 'OFFLINE',
last_ping_time TIMESTAMP,
last_error_message TEXT,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
-- 集成日志表
CREATE TABLE hub_integration_log (
id UUID PRIMARY KEY,
adapter_id UUID NOT NULL,
request_id VARCHAR(50) NOT NULL,
operation VARCHAR(100) NOT NULL,
request_method VARCHAR(10) NOT NULL,
request_url VARCHAR(255) NOT NULL,
request_headers JSONB,
request_body TEXT,
request_time TIMESTAMP NOT NULL,
response_body TEXT,
http_status INTEGER,
response_headers JSONB,
response_time TIMESTAMP,
duration_ms BIGINT,
status VARCHAR(20) NOT NULL,
error_message TEXT,
error_stack TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
-- IoT设备表
CREATE TABLE hub_iot_device (
id UUID PRIMARY KEY,
project_id UUID NOT NULL,
device_id VARCHAR(100) NOT NULL,
device_name VARCHAR(100) NOT NULL,
device_type VARCHAR(50) NOT NULL,
protocol VARCHAR(20) NOT NULL,
equipment_id UUID,
space_node_id UUID,
mqtt_broker VARCHAR(255),
mqtt_port INTEGER,
mqtt_topic VARCHAR(255),
mqtt_client_id VARCHAR(100),
last_payload TEXT,
last_report_time TIMESTAMP,
status VARCHAR(20) DEFAULT 'OFFLINE',
last_online_time TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
UNIQUE(project_id, device_id)
);
-- 数据映射表
CREATE TABLE hub_data_mapping (
id UUID PRIMARY KEY,
project_id UUID NOT NULL,
adapter_id UUID,
name VARCHAR(100) NOT NULL,
entity_type VARCHAR(50) NOT NULL,
field_mappings JSONB,
transform_rules JSONB,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
-- 创建索引
CREATE INDEX idx_adapter_project ON hub_integration_adapter(project_id);
CREATE INDEX idx_adapter_system ON hub_integration_adapter(system_type);
CREATE INDEX idx_adapter_status ON hub_integration_adapter(status);
CREATE INDEX idx_log_adapter ON hub_integration_log(adapter_id);
CREATE INDEX idx_log_request ON hub_integration_log(request_id);
CREATE INDEX idx_log_created ON hub_integration_log(created_at);
CREATE INDEX idx_iot_device_project ON hub_iot_device(project_id);
CREATE INDEX idx_iot_device_status ON hub_iot_device(status);
CREATE INDEX idx_mapping_project ON hub_data_mapping(project_id);
文档维护: 本领域技术方案由 ether-hub 服务负责人维护