刘耀文

刘耀文

java开发者
github

Quick Start with RabbitMQ

Installation#

	docker run --name rabbitmq \
	-p 5672:5672 \
	-p 15672:15672 \
	--network hm-net \
	--hostname my-rabbit \
	-e RABBITMQ_DEFAULT_USER=user \
	-e RABBITMQ_DEFAULT_PASS=password \
	-v mq_plugins:/plugins \
	-d rabbitmq:3.8-management

Spring Boot Integration#

example:springboot-middlewave-example/springboot-rabbitmq at master · liyown/springboot-middlewave-example (github.com)

Sender#

  1. Sender timeout reconnect (asynchronous) configuration
spring:  
  application:  
    name: publisher  
  rabbitmq:  
    host: 192.168.208.128  
    port: 5672  
    username: user  
    password: password  
    virtual-host: /hmall  
    # Send confirm mechanism
    publisher-confirm-type: correlated  
    publisher-returns: true  
    # Retry mechanism
    template:  
      retry:  
        enabled: true  
        max-attempts: 3  
        initial-interval: 1000  
        max-interval: 10000  
        multiplier: 2  
server:  
  port: 7081
  1. Using configuration:
  • Configure message conversion service, the default is JDK serialization, which is not easy to read and takes up a lot of space. Compatibility is not high.
	@Bean  
	public MessageConverter messageConverter() {  
	    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();  
	    jackson2JsonMessageConverter.setCreateMessageIds(true);  
	    return  jackson2JsonMessageConverter;  
	}
  • Set message send callback
    There are two callback scenarios: one is that the confirm information will be returned when it reaches the exchange, and if it does not reach the queue, the ReturnsCallback will be called back, which is generally an operation and maintenance issue.
    image.png|600
public void init() {  
    rabbitTemplate.setReturnsCallback(  
            (ReturnedMessage returned) -> {  
                System.out.println("Message lost: " + returned.getMessage());  
            }  
    );  
    rabbitTemplate.setBeforePublishPostProcessors((message) -> {  
        MessageProperties messageProperties = message.getMessageProperties();  
        String messageId = messageProperties.getMessageId();  
        System.out.println("messageId: " + messageId);  
        return message;  
    });  
  
}

JAVA API usage

  1. Usage with callback:
public void testRabiitStart() throws InterruptedException {  
  
    CorrelationData  correlationData = new CorrelationData("1");  
    // Set callback  
    rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {  
        System.out.println("cause: " + cause);  
        System.out.println("correlationData: " + correlationData1);  
        if (ack) {  
            System.out.println("Message sent successfully");  
        } else {  
            System.out.println("Message sending failed");  
        }  
    });  
    // Queue name  
    String queueName = "queue.lyw";  
  
  
    // Send message, if the queue does not exist, it will be created automatically  
    rabbitTemplate.convertAndSend("direct.lyw", queueName, "hello rabbitmq", correlationData);  
  
}
  1. Using delay queue
public void testRabiitMqDelayMessage() {  
    CorrelationData  correlationData = new CorrelationData("1");  
    rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "hello rabbitmq delay",  
                                  (Message message) -> {  
                                      message.getMessageProperties().setDelayLong(10000L);  
                                      return message;  
                                  }  
                                  , correlationData);  
}

Consumer#

Spring configuration

spring:  
  application:  
    name: publisher  
  rabbitmq:  
    host: 192.168.208.128  
    port: 5672  
    username: user  
    password: password  
    virtual-host: /hmall  
    listener:  
      simple:  
        # Indicates automatic acknowledgment, when sending fails, it will retry, if retry fails, it will send to the dead letter queue (three strategies)
        acknowledge-mode: auto  
        retry:  
          enabled: true  
          max-attempts: 3  
          initial-interval: 1000  
          max-interval: 10000  
          multiplier: 2  
          stateless: true  
server:  
  port: 7082

Create exchange, queue, binding relationship

	@Bean  
	public DirectExchange directExchange() {  
	    return new DirectExchange("direct.lyw");  
	}  
	  
	@Bean  
	public Queue queue() {  
	    return new Queue("queue.lyw");  
	}  
	  
	@Bean  
	public Binding binding(Queue queue, FanoutExchange fanoutExchange) {  
	    return BindingBuilder.bind(queue).to(fanoutExchange);  
	}

Create message listener

	@RabbitListener(queues = "work.queue1")  
	public void onMessageWorkerQueue2(String message) throws InterruptedException {  
	    Thread.sleep(200);  
	    Thread thread = Thread.currentThread();  
	    log.info("Work queue 2 received message: {},Time:{}, Thread{}", message, LocalDateTime.now(), thread.getName());  
	}

Dead letter queue initialization
After reaching the retry count, there are three strategies:
image.png|600

	@Bean  
	public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){  
	    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");  
	}

Using delay queue#

Install plugin (github)
rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ (github.com)
Copy plugin to plugin folder
Install plugin

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Create exchange, queue, binding relationship

	@Bean  
	public Queue delayQueue() {  
	    return new Queue("delay.queue");  
	}  
	  
	@Bean  
	public DirectExchange delayExchange() {  
	    return ExchangeBuilder.directExchange("delay.direct").delayed().durable(true).build();  
	}  
	  
	@Bean Binding delayBinding(Queue delayQueue, DirectExchange delayExchange) {  
	    return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.queue");  
	}

Send message

	@Test  
	public void testRabiitMqDelayMessage() {  
	    CorrelationData  correlationData = new CorrelationData("1");  
	    rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "hello rabbitmq delay",  
	                                  (Message message) -> {  
	                                      message.getMessageProperties().setDelayLong(10000L);  
	                                      return message;  
	                                  }  
	                                  , correlationData);  
	}  
	  
	@Test  
	public void testRabiitMqDelayMessage2() {  
	    rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "1812793267355439105", message -> {  
	        message.getMessageProperties().setDelayLong(10000L);  
	        return message;  
	    });  
	}

Consume delayed messages: Same as ordinary queues

This article is synchronized and updated to xLog by Mix Space Original link: https://me.liuyaowen.club/posts/default/20240820and1

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.