feat(base): B6 ProjectCreatedEvent 消费者预置主数据
监听 queue.property.project.created,消费项目创建事件: - Redis SETNX + mq_consume_log 双层幂等(与 NotifyConsumer 一致) - 可配置默认能力包预置(pms.project.init.default-packages,空则跳过) - Lookup 字典为系统级默认可见,无需项目级复制 - 手动 ACK:成功/幂等命中 basicAck,失败 basicNack(deliveryTag,false,false) 不 requeue RabbitMQConfig 扩展:声明 projectCreatedQueue + projectCreatedBinding。 6 个测试场景全部通过(happy path×2、幂等×2、异常×2)。 Refs: docs/plans/2026-07-03-003-deferred-items-roadmap-plan.md (B6)
This commit is contained in:
parent
7d0c5cb9cc
commit
f71bbaafc0
|
|
@ -1,14 +1,18 @@
|
|||
package com.pms.base.config;
|
||||
|
||||
import com.pms.common.constant.MQConstants;
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.QueueBuilder;
|
||||
import org.springframework.amqp.core.TopicExchange;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* RabbitMQ 配置(base-service 生产者端)
|
||||
* RabbitMQ 配置(base-service)
|
||||
* <p>
|
||||
* 仅声明交换机,队列由消费方(notify-service)自行声明。
|
||||
* 声明交换机(生产者端)+ 项目创建事件队列与绑定(消费者端,B6)。
|
||||
*/
|
||||
@Configuration
|
||||
public class RabbitMQConfig {
|
||||
|
|
@ -20,4 +24,22 @@ public class RabbitMQConfig {
|
|||
public TopicExchange notifyExchange() {
|
||||
return new TopicExchange(MQConstants.EXCHANGE_NOTIFY, true, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* 项目创建事件队列(B6 消费端)
|
||||
*/
|
||||
@Bean
|
||||
public Queue projectCreatedQueue() {
|
||||
return QueueBuilder.durable(MQConstants.QUEUE_PROJECT_CREATED).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 绑定项目创建队列到通知交换机
|
||||
*/
|
||||
@Bean
|
||||
public Binding projectCreatedBinding(TopicExchange notifyExchange) {
|
||||
return BindingBuilder.bind(projectCreatedQueue())
|
||||
.to(notifyExchange)
|
||||
.with(MQConstants.ROUTING_PROJECT_CREATED);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,127 @@
|
|||
package com.pms.base.consumer;
|
||||
|
||||
import com.pms.base.dto.event.ProjectCreatedEvent;
|
||||
import com.pms.base.service.ProjectInitService;
|
||||
import com.pms.common.entity.MqConsumeLog;
|
||||
import com.pms.common.mapper.MqConsumeLogMapper;
|
||||
import com.pms.common.util.JsonUtils;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 项目创建事件消费者(B6)
|
||||
* <p>
|
||||
* 监听 {@link ProjectCreatedEvent},为新项目预置主数据:
|
||||
* <ul>
|
||||
* <li>Lookup 字典:系统级字典默认对所有项目可见({@code getItems(dictCode, projectId)} 自动合并),无需复制</li>
|
||||
* <li>能力包关联:根据配置 {@code pms.project.init.default-packages} 自动关联默认能力包,空则跳过</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* 幂等:Redis SETNX 快速路径 + mq_consume_log DB 兜底,与 NotifyConsumer/AuditConsumer 一致。
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class ProjectCreatedConsumer {
|
||||
|
||||
private static final String CONSUMER_GROUP = "base-project-init-consumer";
|
||||
private static final String IDEMPOTENT_KEY_PREFIX = "idem:project:created:";
|
||||
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
private final MqConsumeLogMapper mqConsumeLogMapper;
|
||||
private final ProjectInitService projectInitService;
|
||||
|
||||
@Value("${pms.project.init.default-packages:}")
|
||||
private String defaultPackagesCsv;
|
||||
|
||||
@org.springframework.amqp.rabbit.annotation.RabbitListener(queues = "${pms.mq.queue.project-created:queue.property.project.created}")
|
||||
public void onProjectCreated(String message, Channel channel, Message amqpMessage) throws java.io.IOException {
|
||||
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
|
||||
try {
|
||||
ProjectCreatedEvent event = JsonUtils.fromJson(message, ProjectCreatedEvent.class);
|
||||
if (event == null || event.getProjectId() == null) {
|
||||
log.warn("项目创建消息解析失败,丢弃: message={}", message);
|
||||
channel.basicAck(deliveryTag, false);
|
||||
return;
|
||||
}
|
||||
|
||||
String eventId = "PROJECT_CREATED:" + event.getProjectId();
|
||||
|
||||
// 1. Redis 快速幂等
|
||||
String idempotentKey = IDEMPOTENT_KEY_PREFIX + eventId;
|
||||
Boolean isNew = stringRedisTemplate.opsForValue().setIfAbsent(idempotentKey, "1", 24, TimeUnit.HOURS);
|
||||
if (Boolean.FALSE.equals(isNew)) {
|
||||
log.info("项目创建事件已消费过(Redis),跳过: projectId={}", event.getProjectId());
|
||||
channel.basicAck(deliveryTag, false);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. DB 持久化幂等兜底
|
||||
if (mqConsumeLogMapper.countByEventIdAndGroup(eventId, CONSUMER_GROUP) > 0) {
|
||||
log.info("项目创建事件已消费过(DB),跳过: projectId={}", event.getProjectId());
|
||||
channel.basicAck(deliveryTag, false);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 预置主数据
|
||||
presetMainData(event);
|
||||
|
||||
// 4. 记录消费成功
|
||||
MqConsumeLog consumeLog = new MqConsumeLog();
|
||||
consumeLog.setEventId(eventId);
|
||||
consumeLog.setConsumerGroup(CONSUMER_GROUP);
|
||||
consumeLog.setQueueName("queue.property.project.created");
|
||||
consumeLog.setConsumeTime(LocalDateTime.now());
|
||||
consumeLog.setStatus("SUCCESS");
|
||||
mqConsumeLogMapper.insert(consumeLog);
|
||||
|
||||
channel.basicAck(deliveryTag, false);
|
||||
log.info("项目创建事件消费成功: projectId={}, projectName={}", event.getProjectId(), event.getProjectName());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("项目创建事件消费失败: message={}", message, e);
|
||||
channel.basicNack(deliveryTag, false, false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 预置主数据
|
||||
* <p>
|
||||
* Lookup 字典:系统级字典通过 {@code getItems(dictCode, projectId)} 自动对项目可见,无需复制。
|
||||
* 能力包:根据 {@code pms.project.init.default-packages} 配置自动关联。空则跳过(等待管理员手动配置)。
|
||||
*/
|
||||
private void presetMainData(ProjectCreatedEvent event) {
|
||||
// Lookup 字典:系统级默认对所有项目可见,无需预置
|
||||
log.info("Lookup 字典:系统级默认可见,无需项目级预置: projectId={}", event.getProjectId());
|
||||
|
||||
// 能力包:根据配置自动关联
|
||||
List<String> packages = parseDefaultPackages();
|
||||
if (packages.isEmpty()) {
|
||||
log.info("未配置默认能力包,跳过自动关联: projectId={}(管理员可通过 ProjectInitService 手动配置)", event.getProjectId());
|
||||
return;
|
||||
}
|
||||
projectInitService.initProject(event.getProjectId(), packages);
|
||||
log.info("默认能力包关联完成: projectId={}, packages={}", event.getProjectId(), packages);
|
||||
}
|
||||
|
||||
private List<String> parseDefaultPackages() {
|
||||
if (defaultPackagesCsv == null || defaultPackagesCsv.isBlank()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return Arrays.stream(defaultPackagesCsv.split(","))
|
||||
.map(String::trim)
|
||||
.filter(s -> !s.isEmpty())
|
||||
.toList();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,196 @@
|
|||
package com.pms.base.consumer;
|
||||
|
||||
import com.pms.base.dto.event.ProjectCreatedEvent;
|
||||
import com.pms.base.service.ProjectInitService;
|
||||
import com.pms.common.entity.MqConsumeLog;
|
||||
import com.pms.common.mapper.MqConsumeLogMapper;
|
||||
import com.pms.common.util.JsonUtils;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
import org.mockito.quality.Strictness;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.core.ValueOperations;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
* ProjectCreatedConsumer 单元测试(B6)
|
||||
* <p>
|
||||
* 覆盖场景:
|
||||
* <ul>
|
||||
* <li>Happy path: 新事件 + 默认能力包已配置 → 调用 initProject + ACK</li>
|
||||
* <li>Happy path: 新事件 + 未配置默认能力包 → 跳过 initProject + ACK</li>
|
||||
* <li>幂等命中(Redis): SETNX 返回 false → ACK 不处理</li>
|
||||
* <li>幂等命中(DB): countByEventIdAndGroup > 0 → ACK 不处理</li>
|
||||
* <li>异常: projectId 为 null → ACK 丢弃</li>
|
||||
* <li>异常: initProject 抛异常 → NACK</li>
|
||||
* </ul>
|
||||
*/
|
||||
@DisplayName("ProjectCreatedConsumer B6 测试")
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
@MockitoSettings(strictness = Strictness.LENIENT)
|
||||
class ProjectCreatedConsumerTest {
|
||||
|
||||
@Mock
|
||||
private StringRedisTemplate redisTemplate;
|
||||
@Mock
|
||||
private ValueOperations<String, String> valueOperations;
|
||||
@Mock
|
||||
private MqConsumeLogMapper mqConsumeLogMapper;
|
||||
@Mock
|
||||
private ProjectInitService projectInitService;
|
||||
@Mock
|
||||
private Channel channel;
|
||||
@Mock
|
||||
private Message amqpMessage;
|
||||
|
||||
@InjectMocks
|
||||
private ProjectCreatedConsumer consumer;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
MessageProperties props = new MessageProperties();
|
||||
props.setDeliveryTag(1L);
|
||||
when(amqpMessage.getMessageProperties()).thenReturn(props);
|
||||
when(redisTemplate.opsForValue()).thenReturn(valueOperations);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("Happy path: 新事件 + 默认能力包已配置 → 调用 initProject + ACK")
|
||||
void shouldInitProjectWhenDefaultPackagesConfigured() throws Exception {
|
||||
// Given
|
||||
ReflectionTestUtils.setField(consumer, "defaultPackagesCsv", "RESIDENTIAL");
|
||||
when(valueOperations.setIfAbsent(anyString(), anyString(), anyLong(), any(TimeUnit.class)))
|
||||
.thenReturn(true);
|
||||
when(mqConsumeLogMapper.countByEventIdAndGroup(anyString(), anyString())).thenReturn(0);
|
||||
|
||||
ProjectCreatedEvent event = new ProjectCreatedEvent();
|
||||
event.setProjectId(1001L);
|
||||
event.setProjectName("测试项目");
|
||||
event.setProjectCode("TEST001");
|
||||
event.setCreatedAt(System.currentTimeMillis());
|
||||
String message = JsonUtils.toJson(event);
|
||||
|
||||
// When
|
||||
consumer.onProjectCreated(message, channel, amqpMessage);
|
||||
|
||||
// Then
|
||||
verify(projectInitService).initProject(eq(1001L), eq(List.of("RESIDENTIAL")));
|
||||
verify(mqConsumeLogMapper).insert(any(MqConsumeLog.class));
|
||||
verify(channel).basicAck(1L, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("Happy path: 新事件 + 未配置默认能力包 → 跳过 initProject + ACK")
|
||||
void shouldSkipInitWhenNoDefaultPackages() throws Exception {
|
||||
// Given
|
||||
ReflectionTestUtils.setField(consumer, "defaultPackagesCsv", "");
|
||||
when(valueOperations.setIfAbsent(anyString(), anyString(), anyLong(), any(TimeUnit.class)))
|
||||
.thenReturn(true);
|
||||
when(mqConsumeLogMapper.countByEventIdAndGroup(anyString(), anyString())).thenReturn(0);
|
||||
|
||||
ProjectCreatedEvent event = new ProjectCreatedEvent();
|
||||
event.setProjectId(1002L);
|
||||
event.setProjectName("测试项目2");
|
||||
String message = JsonUtils.toJson(event);
|
||||
|
||||
// When
|
||||
consumer.onProjectCreated(message, channel, amqpMessage);
|
||||
|
||||
// Then
|
||||
verify(projectInitService, never()).initProject(anyLong(), anyList());
|
||||
verify(channel).basicAck(1L, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("幂等命中(Redis): SETNX 返回 false → ACK 不处理")
|
||||
void shouldSkipWhenRedisIdempotentHit() throws Exception {
|
||||
// Given
|
||||
when(valueOperations.setIfAbsent(anyString(), anyString(), anyLong(), any(TimeUnit.class)))
|
||||
.thenReturn(false);
|
||||
|
||||
ProjectCreatedEvent event = new ProjectCreatedEvent();
|
||||
event.setProjectId(1003L);
|
||||
String message = JsonUtils.toJson(event);
|
||||
|
||||
// When
|
||||
consumer.onProjectCreated(message, channel, amqpMessage);
|
||||
|
||||
// Then
|
||||
verify(projectInitService, never()).initProject(anyLong(), anyList());
|
||||
verify(mqConsumeLogMapper, never()).insert(any(MqConsumeLog.class));
|
||||
verify(channel).basicAck(1L, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("幂等命中(DB): count > 0 → ACK 不处理")
|
||||
void shouldSkipWhenDbIdempotentHit() throws Exception {
|
||||
// Given
|
||||
when(valueOperations.setIfAbsent(anyString(), anyString(), anyLong(), any(TimeUnit.class)))
|
||||
.thenReturn(true);
|
||||
when(mqConsumeLogMapper.countByEventIdAndGroup(anyString(), anyString())).thenReturn(1);
|
||||
|
||||
ProjectCreatedEvent event = new ProjectCreatedEvent();
|
||||
event.setProjectId(1004L);
|
||||
String message = JsonUtils.toJson(event);
|
||||
|
||||
// When
|
||||
consumer.onProjectCreated(message, channel, amqpMessage);
|
||||
|
||||
// Then
|
||||
verify(projectInitService, never()).initProject(anyLong(), anyList());
|
||||
verify(mqConsumeLogMapper, never()).insert(any(MqConsumeLog.class));
|
||||
verify(channel).basicAck(1L, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("异常: projectId 为 null → ACK 丢弃")
|
||||
void shouldAckWhenProjectIdIsNull() throws Exception {
|
||||
// Given
|
||||
String message = "{\"projectName\":\"bad\"}";
|
||||
|
||||
// When
|
||||
consumer.onProjectCreated(message, channel, amqpMessage);
|
||||
|
||||
// Then
|
||||
verify(channel).basicAck(1L, false);
|
||||
verify(projectInitService, never()).initProject(anyLong(), anyList());
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("异常: initProject 抛异常 → NACK")
|
||||
void shouldNackWhenInitProjectThrows() throws Exception {
|
||||
// Given
|
||||
ReflectionTestUtils.setField(consumer, "defaultPackagesCsv", "RESIDENTIAL");
|
||||
when(valueOperations.setIfAbsent(anyString(), anyString(), anyLong(), any(TimeUnit.class)))
|
||||
.thenReturn(true);
|
||||
when(mqConsumeLogMapper.countByEventIdAndGroup(anyString(), anyString())).thenReturn(0);
|
||||
doThrow(new RuntimeException("DB error"))
|
||||
.when(projectInitService).initProject(anyLong(), anyList());
|
||||
|
||||
ProjectCreatedEvent event = new ProjectCreatedEvent();
|
||||
event.setProjectId(1005L);
|
||||
String message = JsonUtils.toJson(event);
|
||||
|
||||
// When
|
||||
consumer.onProjectCreated(message, channel, amqpMessage);
|
||||
|
||||
// Then
|
||||
verify(channel).basicNack(1L, false, false);
|
||||
verify(mqConsumeLogMapper, never()).insert(any(MqConsumeLog.class));
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue