Rabbitmq

Published: by Creative Commons Licence

  • Tags:

快速开始

安装

我这边使用的是docker,参考https://github.com/micahhausler/rabbitmq-compose

首先clone项目:

git clone git@github.com:micahhausler/rabbitmq-compose.git
cd rabbitmq-compose
docker-compose up

之后访问http://$(docker-machine ip default):15672/就可以看到管理界面,用户名和密码均为rabbitmq,服务端口则是5672

Hello World!

准备

首先你要向你的POM中加入下面依赖:

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-client</artifactId>
</dependency>

之后创建渠道的步骤如下:

final String QUEUE_NAME = "test01";
final String HOST = "192.168.1.6";
final String USERNAME = "rabbitmq";
final String PASSWORD = "rabbitmq";


ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
try (Connection connection = factory.newConnection()) {
    Channel channel = connection.createChannel();
}

一个连接可以创建多个渠道,而连接是线程安全的,而渠道是非线程安全的。你可以通过渠道发布或消费消息。

之后不再演示如何创建渠道对象。

生产和消费

RabbitMQ是一个消息中间件,它接收和转发消息。你可以将其视作一个邮局。当你将邮件放到某个邮筒中时,你能确定邮递员最终会把邮件递送到收件人手上。在上面这个比喻中,RabbitMQ起到了邮筒,邮局和邮递员的作用。

RabbitMQ与邮局的主要区别就是RabbitMQ不递送纸张,它递送的数据是二进制数据块。

RabbitMQ以及通用消息中间件,使用了一些术语。

生产意味着发送,发送消息的程序就是生产者。

队列是存在于RabbitMQ中邮筒的名称。队列仅受限与内存和磁盘的大小,它实际上是一个很大对的消息缓冲区。同时可以有多个生产者向一个队列发送消息,同样同时可以有多个消费者从一个队列消费数据。

消费意味着收取,消费者是几乎一直在等着收消息的程序。

注意生产者,消费者和中间件不需要存在于一台主机上。一个应用也可以同时扮演消费者和生产者两个角色。 下面是生产者代码:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent `" + message + "`");

创建队列是幂等操作,RabbitMQ仅在队列不存在时创建它。

下面是消费者代码:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody());
    System.out.println("[x] Received `" + message + "`");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
System.in.read();

工作队列

准备

在上一个例子中我们写了程序向命名队列中发送数据以及从中拉取数据。这次我们要创建一个工作队列,用于向多个工人分配耗时任务。

工作队列背后的主要思想是避免直接执行一个消耗很多资源的任务并等待他完成,取而代之的是我们调度这个任务在之后被执行。我们将任务封装为消息并发送到队列中,后台的工作进程则拉取任务并最终执行它们。如果你运行了多个工作进程,那么任务就可以在它们之间分散。

这个观念在Web应用中特别有用,因为你不可能在一次短暂的HTTP请求窗体中处理一个复杂的任务。

上一个向导中我们发送了字符串,现在我们发送包含了复杂任务信息的字符串。我们没有准备一个真实世界的任务,比如改变图片大小或是渲染PDF文件,但是我们可以虚构一个任务并假装自己很忙,通过使用Thread.sleep函数。我们用任务中包含的点号.数目来理解任务的复杂度,每个点号消耗一秒。

消息的发送者可以借鉴之前的代码,消费者代码如下:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
			String message = new String(delivery.getBody());
            try {
                for (char c : message.toCharArray()) {
                    if (c == '.') {
                        Thread.sleep(1000);
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("[x] Received `" + message + "`");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
System.in.read();

之后启动多个消费者后发送多次消息,会发送所有消费者接收到的消息的数量基本一致。

Round-robin分配

工作队列的一个好处就是平摊工作。默认情况下RabbitMQ会将每个消息发送到下一个消费者手上。平均下来每个消费者就会得到相同数目的消息。这种分配方法的名字叫做round-robin

消息确认

完成一个任务需要数秒,你可能会好奇如果一个消费者在仅完成部分任务的情况下死去,将会发生什么。对于我们现有的代码,一旦RabbtMQ将消息发送给客户端,它就会立即被标记为待删除。此时我们杀死一个工人进程,那么他正在处理的消息将永久丢失。

但是我们不想丢失任何任务,如果一个工作进程死去,我们希望消息发送给另外一个工作进程。

为了保证消息永远不会丢失,RabbitMQ提供了消息确认机制(message acknowledgment)。客户端向RabbitMQ回复一个ack表示接受到并处理了消息,此后RabbitMQ可以自由选择是否删除它。

如果一个客户端在未回复ack之前死去,RabbitMQ将任务消息未被处理完成,并会将消息重新加入队列中。如果此时还有其余在线工作进程,它会立即将消息发送给它们。这种方式保证了消息永远不会丢失,即使工作进程偶然死去。

消息没有超时机制,即直到消费者死去RabbitMQ才会重新发送消息,因此你可以花很长的时间处理一条消息。

默认情况下是需要手动确认消息的,之前的例子中我们显式地通过传递参数autoAck=true关闭了它。现在是时候让工作进程来手动确认消息了。

变更消费者代码如下:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
			String message = new String(delivery.getBody());
            try {
                for (char c : message.toCharArray()) {
                    if (c == '.') {
                        Thread.sleep(1000);
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("[x] Received `" + message + "`");
    		channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
System.in.read();

Ack必须发送给delivery所属的channel,使用不同的channel来确认消息将会抛出异常。

忘记确认消息会造成消息的不断堆积最终导致RabbitMQ无法工作。

消息持久化

尽管任务不会丢失我们的消息,但是如果RabbitMQ服务器停止了,那么消息依旧会被丢失。

一旦RabbitMQ退出,它将忘记所有的队列和消息除非你告诉它要记住他们。要保证消息不丢失,必须做两件事情:标记队列和消息均为持久化。

boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

上面的代码在我们的例子中无法生效,因为我们之前已经创建了一个同名队列,RabbitMQ不会允许你重新定义配置不同的现存队列,因此会直接返回错误。这里有一个便捷的变通方案,就是创建一个不同名字的队列:

现在我们保证了队列不会被忘记。接下来我们需要标记我们的消息为持久化,通过设置消息属性为PERSISTENT_TEXT_PLAIN

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

将消息标记为持久化并不能完全完全避免消息的丢失,因为在RabbitMQ接收到消息到RabbitMQ实际持久化消息之间存在一个间隔,此时消息保存在内存中,一旦此时RabbitMQ宕机,那么消息就会永久丢失。因此不仅消费者需要确认机制,消息生产者也需要确认机制。

公平分配

你会发现工作队列工作的并不如你想象中那么好,比如我们启动两个工作进程,其中奇数任务都是比较费时的任务,而偶数任务则都是比较轻松的任务,那么我们会发现一个工作进程一直忙于工作,而另外一个基本处于空闲状态。

为了解决这个问题,我们需要使用basicQos(1)方法。这个方法相当于告诉RabbitMQ不要一次性发送给消费者超过一个消息。换言之,让RabbitMQ在自己尚未确认消息之前不要再发送新消息过来。这样RabbitMQ就会把消息分配给下一个可以接受消息的消费者。

channel.basicQos(1);

如果你的消费者都处于繁忙状态,你的队列可能被填充满。你应该注意这种情况,或许增加工作进程数目,或者使用其它一些策略。

发布/订阅

之前的案例中我们创建了工作队列。工作队列建立在一个任务仅分配给一个工作进程的架设上。在这部分我们将做些完全不同的事情——我们将把一个消息同时递送给多个消费者。这种模式称为“发布/订阅”。

为了展示这种模式,我们要构建一个简单的日志系统。它由两部分程序组成,第一部分发送日志消息,而第二部分则接收和打印它们。

在我们的日志系统中,每一个运行的接受者都会得到消息。这样我们可以运行一个接受者,它不断地接收消息,并记录到硬盘上,而同时我们运行另外一个接收者,它不断地接收消息并打印到屏幕上。

基本上,发布的日志消息会广播给所有的接收者。

交换器

之前的向导中,我们向队列发送和拉取消息。现在是时候向你们介绍Rabbit中的完整的发送模型。

让我们快速过一遍之前向导中的内容:

  • 生产者是用于发送消息的应用程序。
  • 队列是保存消息的缓冲区。
  • 消费者是接收消息的应用程序。

RabbitMQ中发送模型的核心想法是:生产者不直接将消息发送到队列。实际上,大部分生产者根本不知道消息是否会发送到任何一个队列中。

取而代之的,生产者仅发送消息给交换器(exchange)。交换器是非常简单的东西,一方面它从生产者处接收消息,另一方面,它们将消息推入队列中。一个交换器必须知道它该怎么处理它收到的消息。它应该追加到某个队列中吗?它应该同时加入多个队列中吗?还是它应该被丢弃掉。这样的处理规则由交换器类型所决定。

有数种交换器类型:direct,topic,headers和fanout。我们要关注最后一种——fanout。让我们创建该类型的一个交换器,称之为logs:

channel.exchangeDeclare("logs", "fanout");

fanout类型的交换器非常简单,跟名字的含义一样(发散),它仅将消息加入到所有它知道的队列。这正是我们的日志所需要的。

无名交换器

之前的向导中我们对交换器一无所知,但是我们还是可以成功将消息发送到队列中。这是由于我们使用了一个默认的交换器,它的标识符为空字符串“”。

回想我们是如何发送消息的:

channel.basicPublic("", "hello", null, message.getBytes());

第一个参数就是交换器的名字,空字符串对应的就是默认或无名交换器。消息会发送到与有着与路由键(第二个参数)相同名字的队列中去。

现在,我们就可以发送给我们的带名字的交换器了:

channel.basicPublic("logs", "", null, message.getBytes());

临时队列

就如你所记得的,我们使用了一个带名字的队列,能命名一个队列对我们非常重要——我们需要将工作者指向相同的队列。当你需要在生产者和消费者之间分享队列时,为队列提供一个名字非常重要。

但是这不符合我们的日志系统,我们希望能听到所有的日志消息,不仅仅是一个子集。我们也仅对正在流动的而非之前的消息感兴趣。要解决这些问题,我们需要两个东西。

首先,无论何时我们连接到Rabbit,我们需要一个新鲜的空队列。要实现这个,我们可以创建一个随机名字的队列,或者,让服务器为你挑选一个随机名。

其次,一旦我们断开消费者和队列之间的联系,队列应该被自动删除掉。

在Java客户端代码中,如果我们不为queueDeclare方法提供任何参数,那么就会创建一个非持久化的、互斥的、自动删除的队列,自带一个自动生成的名字。

String queueName = channel.queueDeclare().getQueue();

绑定

我们已经创建了一个发散交换器和一个队列,现在我们需要告诉交换区将消息发送到我们的队列。交换器和队列之间的关联称为绑定(binding)。

channel.queueBind(queueName, "logs", "");

从现在起,logs交换器会把消息追加到我们的队列中。

将它放在一起

发送消息的生产者程序,与之前的向导中的生产者差别不大。最重要的差别就是我们现在将消息发送到logs交换器而非无名交换器。我们在发送前需要提供路由键,但是它的值对于发散交换器是不起作用的。

生产者代码:

channel.exchangeDeclare(EXCHANGE, "fanout");
String message = "info: Hello World!";
channel.basicPublish(EXCHANGE, "", null, message.getBytes());
System.out.println("[x] Sent `" + message + "`");

消费者代码:

channel.exchangeDeclare(EXCHANGE, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE, "");
System.out.println("Create a queue named " + queueName);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("[x] Received `" + message + "`");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
System.in.read();

路由

之前的向导中我们构建了一个简单的日志系统。我们可以一次性将消息广播给多个接收者。

在这个向导中我们准备为它再增加一个特性——我们允许它仅订阅一部分的消息。比如说,我们可以将其中严重错误的信息记录到日志文件中(来节省硬盘空间),并将所有消息继续打印到控制台。

绑定

之前的向导中我们已经创建一个绑定。你可以回想起代码:

channel.queueBind(queueName, EXCHANGE, "");

绑定是指交换器和队列之间的联系。可以简单解读为,这个队列对来自交换器的消息感兴趣。

绑定需要一个额外的路由键参数。为了避免和basicPublic的参数混淆,我们称之为绑定键。我们可以通过下面代码创建绑定:

channel.queueBind(queueName, EXCHANGE, "black");

绑定键的含义取决于交换器类型,对于发散交换器,我们之前用过的,简单地忽略了它的值。

直连交换器

之前向导中我们的日志系统将所有消息广播给了所有的消费者。我们希望对其进行扩展,以支持基于消息的严重性进行过滤。比如我们可能希望一个程序负责将严重消息写入到硬盘中,而不把硬盘空间浪费在其它级别的消息上。

我们使用了一个发散交换器,它无法为我们提供足够的灵活性。它仅支持无意识地广播。

我们将使用一个直连交换器取代它。在直连交换器后的路由算法很简单——消息将会进入到绑定键匹配自己路由键的队列。

在上图中,我们可以看到直连交换器x关联两个队列,第一个队列的绑定键是orange,第二个队列有两个绑定键,black和green。

在上图中,发布给交换器的带有路由键orange的消息将会被导向Q1,而带有路由键black或green的消息被导向Q2。其余所有消息都会被丢弃。

重复绑定

在一个交换器和多个队列之间利用相同的绑定键进行关联是完全合法的。此时,直连交换器将会和发散交换器拥有类似的表现,他会将消息追加到所有匹配的队列中。在上图中,带有路由键black的消息将会同时被加入到Q1和Q2中。

发布日志

我们将使用这种发布模型为我们的日志系统服务。我们将用直连交换器取代发散交换器。我们将把日志严重性作为路由键。通过这种方式,接受者可以选择自己要接收的日志的严重性。让我们首先关注日志的发布。

一如既往,我们首先要创建一个交换器:

channel.exchangeDeclare(EXCHANGE, "direct");

并且我们准备好发送一条消息了:

channel.basicPublic(EXCHANGE, serverity, null, message.getBytes());

为了简化系统,我们假设严重仅为info或warning或error。

订阅

收取消息表现地和之前的向导类似,除了一个区别——我们将为每个感兴趣的严重性创建一个新的绑定。

把它放在一起

生产者代码:

channel.exchangeDeclare(EXCHANGE, "direct");
String info = "info: Hello World!";
String warn = "warn: Hello World!";
String error = "error: Hello World!";
channel.basicPublish(EXCHANGE, "info", null, info.getBytes());
channel.basicPublish(EXCHANGE, "warn", null, warn.getBytes());
channel.basicPublish(EXCHANGE, "error", null, error.getBytes());
System.out.println("[x] Sent `" + "all" + "`");

消费者代码:

channel.exchangeDeclare(EXCHANGE, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE, "error");
channel.queueBind(queueName, EXCHANGE, "warn");
System.out.println("Create a queue named " + queueName);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("[x] Received `" + message + "`");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
System.in.read();

主题(Topic)

上一个向导中我们改进了日志系统,替换了只能广播的愚笨的发散交换器。我们使用了直连交换器,并获得了选择性接受日志的能力。

虽然直连交换器改进了我们的系统,它依旧有一些不足——它不能根据多个条件进行路由。

在我们的日志系统中,我们可能希望通过严重性以外的条件订阅消息,但是不改变发送日志的提供者代码。你可能从unix工具syslog中了解过这些概念,它按照严重性和功能模块路由日志。

它将给给我们提供高灵活性——我们可能希望监听来自cron的所有严重错误以及来自kern的所有日志。

要在我们系统实现这种灵活性,我们需要学习一种更复杂的主题交换器。

主题交换器

发送给主题交换器的消息不允许带有任何路由键——它必须是一组单词的列表,由点号分隔。单词可以是任何东西,但是它们通常表示与消息关联的一些特性。比如说:"stock.used.nyse",或者"nyse.vmw","quick.orange.rabbit"。你可以在路由键中包含任意多的单词,只要总大小不超过255字节。

同样主题交换器的绑定键也必须拥有相同的格式。主题交换器背后的逻辑与直连交换器类似——一个带特殊路由键的消息将会被递送到所有与交换器通过匹配的绑定键关联的队列中。但是这里有两种重要的特殊情况处理:

  • *(星号)可以指代一个单词
  • #(hash)可以指代任意多个单词

在上面这个例子中,我们将发送描述动物的消息。消息带一个由三个单词组成的路由键,其格式为"<速度>.<颜色>.<种类>"。

我们也如图创建了三个绑定键。我们可以做此解释:

  • Q1仅接受颜色为橘色的动物的消息。
  • Q2仅接受兔子的消息和速度慢的动物的信息。

但是如果我们违法了协议,发送了一个由一个或四个单词组成的路由键会发生什么?比如"orange"或"quick.orange.male.rabbit"?这些消息将不会被递送到任何队列中,但是如果你使用的路由键是"lazy"或"lazy.orange.male.rabbit",那么仅Q2会收到消息。

主题交换器非常强大,并且能支持其它交换器所有的一切功能。

当队列绑定到带#的绑定键上,它会收到所有的消息,无论该消息的路由键是什么——就像发散交换器。

如果绑定键不包含特殊字符("*"和"#"),主题交换器就会表现的向直连交换器一样。

放在一起

我们要在我们的日志系统中使用主题交换器。我们将假设所有的日志消息的路由键格式为"<facility>.<serverity>"。

代码和之前基本没什么变化。

消息提供者:

channel.exchangeDeclare(EXCHANGE, "topic");
String info = "info: Hello World!";
String warn = "warn: Hello World!";
String error = "error: Hello World!";
channel.basicPublish(EXCHANGE, "auth.info", null, info.getBytes());
channel.basicPublish(EXCHANGE, "auth.warn", null, warn.getBytes());
channel.basicPublish(EXCHANGE, "auth.error", null, error.getBytes());
channel.basicPublish(EXCHANGE, "task.info", null, info.getBytes());
channel.basicPublish(EXCHANGE, "task.warn", null, warn.getBytes());
channel.basicPublish(EXCHANGE, "task.error", null, error.getBytes());
System.out.println("[x] Sent `" + "all" + "`");

消息消费者:

channel.exchangeDeclare(EXCHANGE, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE, "*.error");
channel.queueBind(queueName, EXCHANGE, "task.*");
System.out.println("Create a queue named " + queueName);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("[x] Received `" + message + "`");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
System.in.read();

远程过程调用

在第二个向导中我们学习了如何使用队列来将耗时任务分配给多个工作进程。

但是如果我们需要调用远程计算机上的一个函数并等待结果返回呢?嗯,这确实是一个不同的故事。这种模式作为远程方法调用被广泛认知。

在这一节向导中,我们准备使用RabbitMQ来构建一个RPC系统,一个客户端以及一个可伸缩的服务器。由于我们并没有值得分配的耗时任务,因此我们准备创建一个愚笨的RPC服务器返回斐波那契数。

客户端接口

为了展示如何使用一个RPC服务,我们准备构建一个简单的客户端类。它准备向外暴露一个名字是call的方法,方法体为发送一个RPC请求并阻塞直到收到答案。

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

RPC的一个注意点

虽然RPC是一个计算上相当通用的模式,但是它经常被批评。当一个程序员没有意识到他一个方法是本地的还是远程过程调用,那么就会发生问题。像这种疑惑会导致系统不可预料,并且为调试带来不必要的复杂度。对RPC的错误使用非但不能简化系统,还会导致意大利面条式代码的出现。

牢牢记在心里,考虑下面的意见:

对于一个函数,它是本地调用还是远程调用要非常明显。

为你的系统增加文档。使得模块之间的依赖要很清晰。

处理错误情况。当RPC服务器长时间下线,客户端该如何表现?

如果心存疑惑,那么就不要使用RPC。如果可能的话,你应该使用一个异步的流水线替代RPC式的阻塞,结果被异步地推送到下一个计算平台。

回调队列

通常在RabbitMQ上实现RPC是容易的事情。一个客服端发送请求消息而另一个服务器回应一个响应消息。为了收取响应,我们需要在请求中带上一个回调队列地址。我们可以使用默认队列(它在Java客户端是互斥的)。让我们试试:

callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());

消息属性

AMQP 0-9-1 协议预定义了14个消息属性的集合,但是其中大多数都很少被使用,除了下面的几个属性:

deliveryMode:标记消息是持久化的还是瞬时的。

contentType:用于描述编码的mime-type。比如常用的JSON编码,将contentType设置为application/json是一个很好的实践方式。

replyTo:通常用于标记一个回调队列。

correlationId:用于将RPC响应关联到请求上。

关联Id(correlationId)

在上面的方法中,我们推荐为每一次RPC调用创建一个回调队列。但是这会非常低效,但是幸运的是还有一种更好的方式——让我们为每一个客户端创建一个单独的回调队列。

这会导致一个新问题,收到一个队列中的响应后,很难分清响应属于哪个请求。这正是使用correlationId的好时机。我们打算为每一次请求都设置一个唯一值。之后当我们收到回调队列中的一个消息,我们将查看这个属性,之后基于此我们能够将响应匹配到一个请求上。如果我们看到一个未知的correlationId,我们可以安全地丢弃这些消息,它不属于我们的请求。

你可能会问,为什么我们要无视回调队列中的未知消息,而不是抛出个异常?它可以归因为服务器端的竞争条件。虽然很少发生,但是也有可能RPC服务器在发送响应后,但在为请求发送一个ack信息之前宕机导致。如果这种情况发生,重启的RPC服务器将会再一次处理请求,这也是为什么我们一定要在客户端优雅地处理重复结果,并且RPC系统应该要是幂等的。

摘要

我们的RPC会像这个一样工作。

对于一个RPC请求,客户端发送的消息带上两个属性:replyTo,它的值是一个客户端创建的匿名队列名称,以及correlationId,每个请求都有各自的唯一值。

请求被发送到一个rpc_queue队列。

RPC工作者等待来自队列的请求。当一个请求出现,它处理请求并发送响应给客户端,利用replyTo字段指定的队列。

客户端一直等待回复队列,如果一个消息出现,它会检查消息的correlationId属性。如果它满足来自请求的值,它将响应返回给应用。

放在一起

客户端代码:

channel.exchangeDeclare(EXCHANGE, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE, queueName);
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
channel.basicConsume(queueName, true, (tag, delivery) -> {
            if (!Objects.equals(uuid, delivery.getProperties().getCorrelationId())) {
                return;
            }
            String body = new String(delivery.getBody());
            System.out.println("remote server response with `" + body + "`");
        }, (tag) -> {
});
channel.basicPublish(EXCHANGE, TASK_QUEUE, new AMQP.BasicProperties.Builder()
                        .replyTo(queueName).correlationId(uuid).build(),
                "10".getBytes());
System.out.println("[x] Sent request");
System.in.read();

服务器代码:

channel.exchangeDeclare(EXCHANGE, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE, TASK_QUEUE);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            int k = Integer.parseInt(message);
            int a0 = 0;
            int a1 = 1;
            for (int i = 0; i < k; i++) {
                int tmp = a0;
                a0 = a1;
                a1 = a1 + tmp;
            }
            channel.basicPublish(EXCHANGE, delivery.getProperties().getReplyTo(), new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build(),
                    Integer.toString(a0).getBytes());
            System.out.println("[x] Received `" + message + "` and send `" + a0 + "`");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
System.in.read();