添加邮件发送死信队列和过期机制,发送失败自动处理进入数据库

This commit is contained in:
柏码の讲师 2025-01-10 01:14:40 +08:00
parent 2b15ff7a3c
commit 1624b00522
9 changed files with 157 additions and 15 deletions

View File

@ -1,7 +1,9 @@
package com.example.config; package com.example.config;
import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.*;
import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -10,10 +12,42 @@ import org.springframework.context.annotation.Configuration;
*/ */
@Configuration @Configuration
public class RabbitConfiguration { public class RabbitConfiguration {
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean("errorQueue")
public Queue dlQueue(){
return QueueBuilder
.durable("error")
.ttl(24 * 60 * 60 * 1000)
.build();
}
@Bean("errorExchange")
public Exchange dlExchange(){
return ExchangeBuilder.directExchange("dlx.direct").build();
}
@Bean("dlBinding") //死信交换机和死信队列进绑定
public Binding dlBinding(@Qualifier("errorExchange") Exchange exchange,
@Qualifier("errorQueue") Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("error-message")
.noargs();
}
@Bean("mailQueue") @Bean("mailQueue")
public Queue queue(){ public Queue queue(){
return QueueBuilder return QueueBuilder
.durable("mail") .durable("mail")
.deadLetterExchange("dlx.direct")
.deadLetterRoutingKey("error-message")
.ttl(3 * 60 * 1000)
.build(); .build();
} }
} }

View File

@ -0,0 +1,30 @@
package com.example.entity;
import lombok.Getter;
import lombok.ToString;
import java.util.HashMap;
import java.util.Map;
@Getter
@ToString
public class QueueMessage {
private String messageType;
private final Map<String, Object> data = new HashMap<>();
public static QueueMessage create(String messageType) {
QueueMessage queueMessage = new QueueMessage();
queueMessage.messageType = messageType;
return queueMessage;
}
public QueueMessage put(String key, Object value) {
data.put(key, value);
return this;
}
@SuppressWarnings("unchecked")
public <T> T get(String key) {
return (T) data.get(key);
}
}

View File

@ -0,0 +1,21 @@
package com.example.entity.dto;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.Date;
@Data
@Accessors(chain = true)
@TableName("db_error_verify_mail")
public class VerifyMailError {
@TableId(type = IdType.AUTO)
Integer id;
String email;
String type;
String code;
Date time;
}

View File

@ -0,0 +1,37 @@
package com.example.listener;
import com.example.entity.QueueMessage;
import com.example.entity.dto.VerifyMailError;
import com.example.mapper.VerifyMailErrorMapper;
import com.example.utils.Const;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
@RabbitListener(queues = Const.MQ_ERROR)
public class ErrorQueueListener {
@Resource
VerifyMailErrorMapper mapper;
@RabbitHandler
public void saveErrorToDatabase(QueueMessage message) {
log.error("出现一条错误的队列消息: {}", message);
switch (message.getMessageType()) {
case "email" -> {
VerifyMailError error = new VerifyMailError()
.setCode(message.get("code").toString())
.setType(message.get("type"))
.setEmail(message.get("email"))
.setTime(new Date());
mapper.insert(error);
}
}
}
}

View File

@ -1,6 +1,9 @@
package com.example.listener; package com.example.listener;
import com.example.entity.QueueMessage;
import com.example.utils.Const;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -8,13 +11,12 @@ import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender; import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Map;
/** /**
* 用于处理邮件发送的消息队列监听器 * 用于处理邮件发送的消息队列监听器
*/ */
@Slf4j
@Component @Component
@RabbitListener(queues = "mail") @RabbitListener(queues = Const.MQ_MAIL, concurrency = "10")
public class MailQueueListener { public class MailQueueListener {
@Resource @Resource
@ -25,13 +27,13 @@ public class MailQueueListener {
/** /**
* 处理邮件发送 * 处理邮件发送
* @param data 邮件信息 * @param message 邮件信息
*/ */
@RabbitHandler @RabbitHandler
public void sendMailMessage(Map<String, Object> data) { public void sendMailMessage(QueueMessage message) {
String email = data.get("email").toString(); String email = message.get("email"), type = message.get("type");
Integer code = (Integer) data.get("code"); Integer code = message.get("code");
SimpleMailMessage message = switch (data.get("type").toString()) { SimpleMailMessage mailMessage = switch (type) {
case "register" -> case "register" ->
createMessage("欢迎注册我们的网站", createMessage("欢迎注册我们的网站",
"您的邮件注册验证码为: "+code+"有效时间3分钟为了保障您的账户安全请勿向他人泄露验证码信息。", "您的邮件注册验证码为: "+code+"有效时间3分钟为了保障您的账户安全请勿向他人泄露验证码信息。",
@ -46,8 +48,9 @@ public class MailQueueListener {
email); email);
default -> null; default -> null;
}; };
if(message == null) return; if(mailMessage == null) return;
sender.send(message); log.info("正在向 {} 发送 {} 类型的电子邮件...", email, type);
sender.send(mailMessage);
} }
/** /**

View File

@ -0,0 +1,9 @@
package com.example.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.entity.dto.VerifyMailError;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface VerifyMailErrorMapper extends BaseMapper<VerifyMailError> {
}

View File

@ -2,6 +2,7 @@ package com.example.service.impl;
import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.entity.QueueMessage;
import com.example.entity.dto.Account; import com.example.entity.dto.Account;
import com.example.entity.dto.AccountDetails; import com.example.entity.dto.AccountDetails;
import com.example.entity.dto.AccountPrivacy; import com.example.entity.dto.AccountPrivacy;
@ -23,7 +24,6 @@ import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Date; import java.util.Date;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -86,8 +86,9 @@ public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> impl
return "请求频繁,请稍后再试"; return "请求频繁,请稍后再试";
Random random = new Random(); Random random = new Random();
int code = random.nextInt(899999) + 100000; int code = random.nextInt(899999) + 100000;
Map<String, Object> data = Map.of("type",type,"email", email, "code", code); QueueMessage message = QueueMessage.create("email");
rabbitTemplate.convertAndSend(Const.MQ_MAIL, data); message.put("type",type).put("email", email).put("code", code);
rabbitTemplate.convertAndSend(Const.MQ_MAIL, message);
stringRedisTemplate.opsForValue() stringRedisTemplate.opsForValue()
.set(Const.VERIFY_EMAIL_DATA + email, String.valueOf(code), 3, TimeUnit.MINUTES); .set(Const.VERIFY_EMAIL_DATA + email, String.valueOf(code), 3, TimeUnit.MINUTES);
return null; return null;

View File

@ -21,6 +21,7 @@ public final class Const {
public final static String ATTR_USER_ID = "userId"; public final static String ATTR_USER_ID = "userId";
//消息队列 //消息队列
public final static String MQ_MAIL = "mail"; public final static String MQ_MAIL = "mail";
public final static String MQ_ERROR = "error";
//用户角色 //用户角色
public final static String ROLE_DEFAULT = "user"; public final static String ROLE_DEFAULT = "user";
public final static String ROLE_ADMIN = "admin"; public final static String ROLE_ADMIN = "admin";

View File

@ -13,6 +13,12 @@ spring:
username: admin username: admin
password: admin password: admin
virtual-host: / virtual-host: /
listener:
simple:
retry:
enabled: true
max-attempts: 3
initial-interval: 1000ms
datasource: datasource:
url: jdbc:mysql://localhost:3306/study url: jdbc:mysql://localhost:3306/study
username: root username: root