刘耀文

刘耀文

java开发者
github

RabbitMQの迅速な使用

インストール#

	docker run --name rabbitmq \
	-p 5672:5672 \
	-p 15672:15672 \
	--network hm-net \
	--hostname my-rabbit \
	-e RABBITMQ_DEFAULT_USER=user \
	-e RABBITMQ_DEFAULT_PASS=password \
	-v mq_plugins:/plugins \
	-d rabbitmq:3.8-management

Spring Boot 統合#

example:springboot-middlewave-example/springboot-rabbitmq at master · liyown/springboot-middlewave-example (github.com)

送信者#

  1. 送信者のタイムアウト再接続(非同期)設定
spring:  
  application:  
    name: publisher  
  rabbitmq:  
    host: 192.168.208.128  
    port: 5672  
    username: user  
    password: password  
    virtual-host: /hmall  
    # 確認メカニズムを送信
    publisher-confirm-type: correlated  
    publisher-returns: true  
    # リトライメカニズム
    template:  
      retry:  
        enabled: true  
        max-attempts: 3  
        initial-interval: 1000  
        max-interval: 10000  
        multiplier: 2  
server:  
  port: 7081
  1. 設定の使用:
  • メッセージ変換サービスを設定します。デフォルトは JDK シリアル化で、読みやすくなく、スペースを多く占有します。互換性が低いです。
	@Bean  
	public MessageConverter messageConverter() {  
	    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();  
	    jackson2JsonMessageConverter.setCreateMessageIds(true);  
	    return  jackson2JsonMessageConverter;  
	}
  • メッセージ送信コールバックを設定
    コールバックには二つのケースがあります。一つは交換機に到達すると確認情報が返され、もしキューに到達しなければ ReturnsCallback が呼び出されます。この時は一般的に運用面の問題です。
    image.png|600
public void init() {  
    rabbitTemplate.setReturnsCallback(  
            (ReturnedMessage returned) -> {  
                System.out.println("メッセージが失われました: " + returned.getMessage());  
            }  
    );  
    rabbitTemplate.setBeforePublishPostProcessors((message) -> {  
        MessageProperties messageProperties = message.getMessageProperties();  
        String messageId = messageProperties.getMessageId();  
        System.out.println("messageId: " + messageId);  
        return message;  
    });  
  
}

JAVA API の使用

  1. コールバック付きの使用:
public void testRabiitStart() throws InterruptedException {  
  
    CorrelationData  correlationData = new CorrelationData("1");  
    // コールバックを設定  
    rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {  
        System.out.println("原因: " + cause);  
        System.out.println("correlationData: " + correlationData1);  
        if (ack) {  
            System.out.println("メッセージ送信成功");  
        } else {  
            System.out.println("メッセージ送信失敗");  
        }  
    });  
    // キュー名  
    String queueName = "queue.lyw";  
  
  
    // メッセージを送信します。キューがない場合は自動的に作成されます。  
    rabbitTemplate.convertAndSend("direct.lyw", queueName, "hello rabbitmq", correlationData);  
  
}
  1. 遅延キューの使用
public void testRabiitMqDelayMessage() {  
    CorrelationData  correlationData = new CorrelationData("1");  
    rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "hello rabbitmq delay",  
                                  (Message message) -> {  
                                      message.getMessageProperties().setDelayLong(10000L);  
                                      return message;  
                                  }  
                                  , correlationData);  
}

消費者#

spring 設定

spring:  
  application:  
    name: publisher  
  rabbitmq:  
    host: 192.168.208.128  
    port: 5672  
    username: user  
    password: password  
    virtual-host: /hmall  
    listener:  
      simple:  
        # 自動応答を示し、送信に異常があればリトライし、リトライが成功しなければ死信キューに送信します(三つの戦略)
        acknowledge-mode: auto  
        retry:  
          enabled: true  
          max-attempts: 3  
          initial-interval: 1000  
          max-interval: 10000  
          multiplier: 2  
          stateless: true  
server:  
  port: 7082

交換機、キュー、バインディング関係の作成

	@Bean  
	public DirectExchange directExchange() {  
	    return new DirectExchange("direct.lyw");  
	}  
	  
	@Bean  
	public Queue queue() {  
	    return new Queue("queue.lyw");  
	}  
	  
	@Bean  
	public Binding binding(Queue queue, FanoutExchange fanoutExchange) {  
	    return BindingBuilder.bind(queue).to(fanoutExchange);  
	}

メッセージリスナーの作成

	@RabbitListener(queues = "work.queue1")  
	public void onMessageWorkerQueue2(String message) throws InterruptedException {  
	    Thread.sleep(200);  
	    Thread thread = Thread.currentThread();  
	    log.info("作業キュー2がメッセージを受信しました: {},時間:{},スレッド{}", message, LocalDateTime.now(), thread.getName());  
	}

死信キューの初期化
リトライ回数に達した後、三つの戦略があります。
image.png|600

	@Bean  
	public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){  
	    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");  
	}

遅延キューの使用#

プラグインのインストール(github)
rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ (github.com)
プラグインをプラグインフォルダにコピー
プラグインをインストール

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

交換機、キュー、バインディング関係の作成

	@Bean  
	public Queue delayQueue() {  
	    return new Queue("delay.queue");  
	}  
	  
	@Bean  
	public DirectExchange delayExchange() {  
	    return ExchangeBuilder.directExchange("delay.direct").delayed().durable(true).build();  
	}  
	  
	@Bean Binding delayBinding(Queue delayQueue, DirectExchange delayExchange) {  
	    return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.queue");  
	}

メッセージを送信

	@Test  
	public void testRabiitMqDelayMessage() {  
	    CorrelationData  correlationData = new CorrelationData("1");  
	    rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "hello rabbitmq delay",  
	                                  (Message message) -> {  
	                                      message.getMessageProperties().setDelayLong(10000L);  
	                                      return message;  
	                                  }  
	                                  , correlationData);  
	}  
	  
	@Test  
	public void testRabiitMqDelayMessage2() {  
	    rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "1812793267355439105", message -> {  
	        message.getMessageProperties().setDelayLong(10000L);  
	        return message;  
	    });  
	}

遅延メッセージの消費:通常のキューと同じです。

この記事は Mix Space によって xLog に同期更新されました
元のリンクは https://me.liuyaowen.club/posts/default/20240820and1


読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。