RabbitMQ的工作隊列和路由

RabbitMQ的工作隊列和路由

工作隊列:Working Queue

工作隊列這個概念與簡單的發送/接收消息的區別就是:接收方接收到消息後,可能需要花費更長的時間來處理消息,這個過程就叫一個Work/Task。

幾個概念

分配:多個接收端接收同一個Queue時,如何分配?

消息確認:Server端如何確定接收方的Work已經對消息進行了完整的處理?

消息持久化:發送方、服務端Queue如何對未處理的消息進行磁盤持久化?

Round-robin分配

多個接收端接收同一個Queue時,採用了Round-robin分配算法,即輪叫調度——依次分配給各個接收方。

消息確認

默認開啟了消息確認(接收方接收到消息後,立即向服務器發回確認)。消息接收方處理完消息後,向服務器發送消息確認,服務器再刪除該消息。

對於耗時的work,可以先關閉自動消息確認,在work完成後,再手動發回確認。

channel.basicConsume(“hello”,false/*關閉自動消息確認*/,consumer);

// …work完成後

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

持久化

  1. Server端的Queue持久化

注意的是,如果已經聲明了同名非持久化的Queue,則再次聲明無效。

發送方和接收方都需要指定該參數。

boolean durable = true;

channel.queueDeclare(“task_queue”, durable, false, false, null);

 

  1. Message持久化

channel.basicPublish(“”, “task_queue”, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

 

負載分配

為了解決各個接收端工作量相差太大的問題(有的一直busy,有的空閑比較多),突破Round-robin。

int prefetchCount = 1;

channel.basicQos(prefetchCount);

意思為,最多為當前接收方發送一條消息。如果接收方還未處理完畢消息,還沒有回發確認,就不要再給他分配消息了,應該把當前消息分配給其它空閑接收方。

固定關鍵詞路由:Routing

使用類型為direct的exchange,發送特定關鍵詞(RoutingKey)的消息給訂閱該關鍵詞的Queue。

場景示例:消息發送方發送了類型為[error][info]的兩種消息,寫磁盤的消息接受者只接受error類型的消息,Console打印的接收兩者。

(上圖採用了不同顏色來作為routingKey)

發送方

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(“localhost”);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, “direct”/*exchange類型為direct*/);

 

channel.basicPublish(EXCHANGE_NAME, “info”/*關鍵詞=info*/, null, message.getBytes());

channel.close();

connection.close();

接收方

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(“localhost”);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, “direct”/*exchange類型為direct*/);

// 創建匿名Queue

String queueName = channel.queueDeclare().getQueue();

// 訂閱某個關鍵詞,綁定到匿名Queue中

channel.queueBind(quueName,EXCHANGE_NAME,”error”);

channel.queueBind(quueName,EXCHANGE_NAME,”info”);

 

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume(queueName, true, consumer);

 

QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // Blocking…

String message = new String(delivery.getBody());

String routingKey = delivery.getEnvelope().getRoutingKey(); // 可獲取路由關鍵詞

關鍵詞模式路由:Topics

這種模式可以看做對Routing的擴展。Routing只能使用固定關鍵詞,而Topics模式可以訂閱模糊關鍵詞。

關鍵詞必須是一組word,由點號分割。例如”xxx.yyy.zzz”,限定255bytes。

* 表示一個word;

# 表示0個或者多個word;

發送方

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(“localhost”);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, “topic”/*exchange類型*/);

 

channel.basicPublish(EXCHANGE_NAME, “xxx.yyy”/*關鍵詞routingKey*/, null, message.getBytes());

channel.close();

connection.close();

接收方

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(“localhost”);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, “topic”/*exchange類型*/);

// 創建匿名Queue

String queueName = channel.queueDeclare().getQueue();

// 訂閱某個關鍵詞,綁定到匿名Queue中

channel.queueBind(quueName,EXCHANGE_NAME,”*.yyy”);

 

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume(queueName, true, consumer);

 

QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // Blocking…

String message = new String(delivery.getBody());

String routingKey = delivery.getEnvelope().getRoutingKey(); // 可獲取路由關鍵詞

 

 

以下文章點擊率最高

Loading…

     

如果這文章對你有幫助,請掃左上角微信支付-支付寶,給於打賞,以助博客運營