Installation#
docker run --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
--network hm-net \
--hostname my-rabbit \
-e RABBITMQ_DEFAULT_USER=user \
-e RABBITMQ_DEFAULT_PASS=password \
-v mq_plugins:/plugins \
-d rabbitmq:3.8-management
Spring Boot Integration#
Sender#
- Sender timeout reconnect (asynchronous) configuration
spring:
application:
name: publisher
rabbitmq:
host: 192.168.208.128
port: 5672
username: user
password: password
virtual-host: /hmall
# Send confirm mechanism
publisher-confirm-type: correlated
publisher-returns: true
# Retry mechanism
template:
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
max-interval: 10000
multiplier: 2
server:
port: 7081
- Using configuration:
- Configure message conversion service, the default is JDK serialization, which is not easy to read and takes up a lot of space. Compatibility is not high.
@Bean
public MessageConverter messageConverter() {
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
- Set message send callback
There are two callback scenarios: one is that the confirm information will be returned when it reaches the exchange, and if it does not reach the queue, the ReturnsCallback will be called back, which is generally an operation and maintenance issue.
public void init() {
rabbitTemplate.setReturnsCallback(
(ReturnedMessage returned) -> {
System.out.println("Message lost: " + returned.getMessage());
}
);
rabbitTemplate.setBeforePublishPostProcessors((message) -> {
MessageProperties messageProperties = message.getMessageProperties();
String messageId = messageProperties.getMessageId();
System.out.println("messageId: " + messageId);
return message;
});
}
JAVA API usage
- Usage with callback:
public void testRabiitStart() throws InterruptedException {
CorrelationData correlationData = new CorrelationData("1");
// Set callback
rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {
System.out.println("cause: " + cause);
System.out.println("correlationData: " + correlationData1);
if (ack) {
System.out.println("Message sent successfully");
} else {
System.out.println("Message sending failed");
}
});
// Queue name
String queueName = "queue.lyw";
// Send message, if the queue does not exist, it will be created automatically
rabbitTemplate.convertAndSend("direct.lyw", queueName, "hello rabbitmq", correlationData);
}
- Using delay queue
public void testRabiitMqDelayMessage() {
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "hello rabbitmq delay",
(Message message) -> {
message.getMessageProperties().setDelayLong(10000L);
return message;
}
, correlationData);
}
Consumer#
Spring configuration
spring:
application:
name: publisher
rabbitmq:
host: 192.168.208.128
port: 5672
username: user
password: password
virtual-host: /hmall
listener:
simple:
# Indicates automatic acknowledgment, when sending fails, it will retry, if retry fails, it will send to the dead letter queue (three strategies)
acknowledge-mode: auto
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
max-interval: 10000
multiplier: 2
stateless: true
server:
port: 7082
Create exchange, queue, binding relationship
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.lyw");
}
@Bean
public Queue queue() {
return new Queue("queue.lyw");
}
@Bean
public Binding binding(Queue queue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
Create message listener
@RabbitListener(queues = "work.queue1")
public void onMessageWorkerQueue2(String message) throws InterruptedException {
Thread.sleep(200);
Thread thread = Thread.currentThread();
log.info("Work queue 2 received message: {},Time:{}, Thread{}", message, LocalDateTime.now(), thread.getName());
}
Dead letter queue initialization
After reaching the retry count, there are three strategies:
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
Using delay queue#
Install plugin (github)
rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ (github.com)
Copy plugin to plugin folder
Install plugin
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Create exchange, queue, binding relationship
@Bean
public Queue delayQueue() {
return new Queue("delay.queue");
}
@Bean
public DirectExchange delayExchange() {
return ExchangeBuilder.directExchange("delay.direct").delayed().durable(true).build();
}
@Bean Binding delayBinding(Queue delayQueue, DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.queue");
}
Send message
@Test
public void testRabiitMqDelayMessage() {
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "hello rabbitmq delay",
(Message message) -> {
message.getMessageProperties().setDelayLong(10000L);
return message;
}
, correlationData);
}
@Test
public void testRabiitMqDelayMessage2() {
rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "1812793267355439105", message -> {
message.getMessageProperties().setDelayLong(10000L);
return message;
});
}
Consume delayed messages: Same as ordinary queues
This article is synchronized and updated to xLog by Mix Space Original link: https://me.liuyaowen.club/posts/default/20240820and1