インストール#
docker run --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
--network hm-net \
--hostname my-rabbit \
-e RABBITMQ_DEFAULT_USER=user \
-e RABBITMQ_DEFAULT_PASS=password \
-v mq_plugins:/plugins \
-d rabbitmq:3.8-management
Spring Boot 統合#
送信者#
- 送信者のタイムアウト再接続(非同期)設定
spring:
application:
name: publisher
rabbitmq:
host: 192.168.208.128
port: 5672
username: user
password: password
virtual-host: /hmall
# 確認メカニズムを送信
publisher-confirm-type: correlated
publisher-returns: true
# リトライメカニズム
template:
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
max-interval: 10000
multiplier: 2
server:
port: 7081
- 設定の使用:
- メッセージ変換サービスを設定します。デフォルトは JDK シリアル化で、読みやすくなく、スペースを多く占有します。互換性が低いです。
@Bean
public MessageConverter messageConverter() {
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
- メッセージ送信コールバックを設定
コールバックには二つのケースがあります。一つは交換機に到達すると確認情報が返され、もしキューに到達しなければ ReturnsCallback が呼び出されます。この時は一般的に運用面の問題です。
public void init() {
rabbitTemplate.setReturnsCallback(
(ReturnedMessage returned) -> {
System.out.println("メッセージが失われました: " + returned.getMessage());
}
);
rabbitTemplate.setBeforePublishPostProcessors((message) -> {
MessageProperties messageProperties = message.getMessageProperties();
String messageId = messageProperties.getMessageId();
System.out.println("messageId: " + messageId);
return message;
});
}
JAVA API の使用
- コールバック付きの使用:
public void testRabiitStart() throws InterruptedException {
CorrelationData correlationData = new CorrelationData("1");
// コールバックを設定
rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {
System.out.println("原因: " + cause);
System.out.println("correlationData: " + correlationData1);
if (ack) {
System.out.println("メッセージ送信成功");
} else {
System.out.println("メッセージ送信失敗");
}
});
// キュー名
String queueName = "queue.lyw";
// メッセージを送信します。キューがない場合は自動的に作成されます。
rabbitTemplate.convertAndSend("direct.lyw", queueName, "hello rabbitmq", correlationData);
}
- 遅延キューの使用
public void testRabiitMqDelayMessage() {
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "hello rabbitmq delay",
(Message message) -> {
message.getMessageProperties().setDelayLong(10000L);
return message;
}
, correlationData);
}
消費者#
spring 設定
spring:
application:
name: publisher
rabbitmq:
host: 192.168.208.128
port: 5672
username: user
password: password
virtual-host: /hmall
listener:
simple:
# 自動応答を示し、送信に異常があればリトライし、リトライが成功しなければ死信キューに送信します(三つの戦略)
acknowledge-mode: auto
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
max-interval: 10000
multiplier: 2
stateless: true
server:
port: 7082
交換機、キュー、バインディング関係の作成
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.lyw");
}
@Bean
public Queue queue() {
return new Queue("queue.lyw");
}
@Bean
public Binding binding(Queue queue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
メッセージリスナーの作成
@RabbitListener(queues = "work.queue1")
public void onMessageWorkerQueue2(String message) throws InterruptedException {
Thread.sleep(200);
Thread thread = Thread.currentThread();
log.info("作業キュー2がメッセージを受信しました: {},時間:{},スレッド{}", message, LocalDateTime.now(), thread.getName());
}
死信キューの初期化
リトライ回数に達した後、三つの戦略があります。
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
遅延キューの使用#
プラグインのインストール(github)
rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ (github.com)
プラグインをプラグインフォルダにコピー
プラグインをインストール
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
交換機、キュー、バインディング関係の作成
@Bean
public Queue delayQueue() {
return new Queue("delay.queue");
}
@Bean
public DirectExchange delayExchange() {
return ExchangeBuilder.directExchange("delay.direct").delayed().durable(true).build();
}
@Bean Binding delayBinding(Queue delayQueue, DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.queue");
}
メッセージを送信
@Test
public void testRabiitMqDelayMessage() {
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "hello rabbitmq delay",
(Message message) -> {
message.getMessageProperties().setDelayLong(10000L);
return message;
}
, correlationData);
}
@Test
public void testRabiitMqDelayMessage2() {
rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "1812793267355439105", message -> {
message.getMessageProperties().setDelayLong(10000L);
return message;
});
}
遅延メッセージの消費:通常のキューと同じです。
この記事は Mix Space によって xLog に同期更新されました
元のリンクは https://me.liuyaowen.club/posts/default/20240820and1