Rocketmq

Published: by Creative Commons Licence

  • Tags:

引言

Apache RocketMQ是阿里巴巴内部开发的开源分布式消息中间件。RocketMQ是以java开发的应用,并向C++,GO,JAVA三方语言提供了客户端API。

RocketMQ已经捐赠给Apache软件基金会,并于2017年9月25日成为Apache的顶级项目。

安装

安装RocketMQ

由于RocketMQ是以java开发的,因此安装非常简单。

首先前往RocketMQ的github仓库拉取代码。

之后进入项目根目录,并利用下面的maven指令编译打包代码:

> mvn -Prelease-all -DskipTests clean install -U

distribution/target/apache-rocketmq移动到自己的安装目录。

启 动NameServer

> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

启动Broker

在启动前可以先打开bin/runbroker.sh修改JVM参数,其中比较重要的就是JVM的内存设置。

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"

接下来执行下面命令启动broker:

> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f ~/logs/rocketmqlogs/broker.log 
The broker[%s, 172.30.30.233:10911] boot success...

测试是否启动成功

在测试之前可以通过设置环境变量让测试脚本能找到NameServer:

> export NAMESRV_ADDR=localhost:9876

之后进行测试:

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...

测试还是挺重要的,我第一次就测失败了,原因是忘了启动broker(捂脸)。

安装RocketMQ-Console

RocketMQ Externals包含了大量由社区提交的项目,用于增强Apache RocketMQ。

RocketMQ-Console项目是基于spring-boot开发的RocketMQ的WEB控制台。

我是使用docker启动的:

docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng

如果没有不使用docker的话,就需要自己手动去github仓库拉代码,并手动编译后启动:

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar

如果连不上NameServer的话可以先检查一下防火墙。

API案例

同步发送消息

public class Main {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("daltao");
        producer.setNamesrvAddr("192.168.1.6:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message message = new Message("TopicTest", "TagA", ("Hello RocketMQ-" + i).getBytes(Charset.forName("utf8")));
            SendResult result = producer.send(message);
            System.out.println(result);
        }

        producer.shutdown();
    }
}

异步发送消息

public class Main {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("daltao");
        producer.setNamesrvAddr("192.168.1.6:9876");
        producer.start();

        CountDownLatch countDownLatch = new CountDownLatch(100);
        for (int i = 0; i < 100; i++) {
            Message message = new Message("TopicTest", "TagB", "Order-" + i, ("Hello RocketMQ-" + i).getBytes(Charset.forName("utf8")));
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                    countDownLatch.countDown();
                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                    countDownLatch.countDown();
                }
            });
        }

        countDownLatch.await();
        producer.shutdown();
    }
}

单路发送消息

单路发送消息就是指仅发送消息而不用等待服务器对消息的回应,类似于udp协议,其优点是最大化了吞吐性能,但是缺点就是可能会发生消息丢失。

public class Main {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("daltao");
        consumer.setNamesrvAddr("192.168.1.6:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(MessageFormat.format("{0} received message - {1} ", Thread.currentThread().getName(), msgs.stream().map(x -> new String(x.getBody(), Charset.forName("utf8"))).collect(Collectors.toList())));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Start consuming");
    }
}

并发消费消息

public class Main {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("daltao");
        consumer.setNamesrvAddr("192.168.1.6:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(MessageFormat.format("{0} received message - {1} ", Thread.currentThread().getName(), msgs.stream().map(x -> new String(x.getBody(), Charset.forName("utf8"))).collect(Collectors.toList())));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Start consuming");
    }
}

顺序消费消息

RocketMQ仅支持同一队列顺序消费,因此如果要求顺序消费,需要生产者将关联消息投递到同一队列中。

生产者代码(我们将同一订单的消息投递到相同队列中):

public class Main {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("daltao");
        consumer.setNamesrvAddr("192.168.1.6:9876");
        consumer.subscribe("TopicTest", "TagF");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        AtomicInteger consumeTimes = new AtomicInteger(0);
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                int t = consumeTimes.incrementAndGet();
                System.out.printf("%s received new message status %d: %s%n", Thread.currentThread().getName(), t % 2, msgs.stream().map(x -> new String(x.getBody(), Charset.forName("utf8"))).collect(Collectors.toList()));
                switch (t % 2) {
                    case 0:
                        return ConsumeOrderlyStatus.SUCCESS;
                    case 1:
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    default:
                        return ConsumeOrderlyStatus.SUCCESS;
                }
            }
        });

        consumer.start();
        System.out.println("Start consuming");
    }
}

消费者代码:

public class Main {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("daltao");
        consumer.setNamesrvAddr("192.168.1.6:9876");
        consumer.subscribe("TopicTest", "TagF");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        AtomicInteger consumeTimes = new AtomicInteger(0);
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                int t = consumeTimes.incrementAndGet();
                System.out.printf("%s received new message status %d: %s%n", Thread.currentThread().getName(), t % 2, msgs.stream().map(x -> new String(x.getBody(), Charset.forName("utf8"))).collect(Collectors.toList()));
                switch (t % 2) {
                    case 0:
                        return ConsumeOrderlyStatus.COMMIT;
                    case 1:
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    default:
                        return ConsumeOrderlyStatus.SUCCESS;
                }
            }
        });

        consumer.start();
        System.out.println("Start consuming");
    }
}

广播模式消费

所有投递的消息都会发送给所有处于广播模式下的订阅者(一个消息发送给多个订阅者)。

生产者代码:

public class Main {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("daltao");
        producer.setNamesrvAddr("192.168.1.6:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message message = new Message("TopicTest", "TagF", "OrderID188", ("Hello RocketMQ-" + i).getBytes(Charset.forName("utf8")));
            SendResult result = producer.send(message);
            System.out.println(result);
        }
        producer.shutdown();
    }
}

消费者代码:

public class RocketMQStudy8 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("daltao");
        consumer.setNamesrvAddr("192.168.1.6:9876");
        consumer.subscribe("TopicTest", "TagF");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(MessageFormat.format("{0} received message - {1} ", Thread.currentThread().getName(), msgs.stream().map(x -> new String(x.getBody(), Charset.forName("utf8"))).collect(Collectors.toList())));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Start consuming");
    }
}

延迟消息

定时消息是指定了延迟时间的消息,该消息会在延迟时间后才会被真正投递。

生产者代码:


消费者代码:

public class Main {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("daltao");
        consumer.setNamesrvAddr("192.168.1.6:9876");
        consumer.subscribe("TopicTest", "TagF");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(MessageFormat.format("{0} received message - {1} ", Thread.currentThread().getName(), msgs.stream().map(x -> new String(x.getBody(), Charset.forName("utf8"))).collect(Collectors.toList())));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Start consuming");
    }
}

批量消息

对于多个较小的消息,我们可以把它们合成一批发送,这样可以提高性能。

同一批的消息,必须有相同的主题,相同的waitStoreMsgOK,并且不支持延迟。除此之外,一批消息的大小之和不能超过1MB。

生产者代码:

public class Main {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("daltao");
        producer.setNamesrvAddr("192.168.1.6:9876");
        producer.setRetryTimesWhenSendFailed(0);
        producer.start();

        String topic = "TopicTest";
        Charset charset = Charset.forName("utf8");
        List<Message> messageList = Arrays.asList(
                new Message(topic, "BatchTest", "OrderID001", "Hello RocketMQ-1".getBytes(charset)),
                new Message(topic, "BatchTest", "OrderID002", "Hello RocketMQ-2".getBytes(charset)),
                new Message(topic, "BatchTest", "OrderID003", "Hello RocketMQ-3".getBytes(charset))
        );
        producer.send(messageList);

        producer.shutdown();
    }
}

消费者代码:

public class Main {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("daltao");
        consumer.setNamesrvAddr("192.168.1.6:9876");
        consumer.subscribe("TopicTest", "BatchTest");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(MessageFormat.format("{0} received message - {1} ", Thread.currentThread().getName(), msgs.stream().map(x -> new String(x.getBody(), Charset.forName("utf8"))).collect(Collectors.toList())));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Start consuming");
    }
}

过滤消息

大多数情况,我们利用tag来过滤消息。但是一个消息最多只能有一个tag,有些时候就无法满足我们的需求。

RocketMQ允许我们使用SQL的语法来过滤消息。

我们可以利用Message#putUserProperty来为消息增加用户属性,之后消费者端就可以根据这些属性过滤消息。

RocketMQ仅支持了一些基础语法,其中包括:

  1. 数值比较,比如>, >=, <, <=,BETWEEN,=;
  2. 字符比较,比如=, <>, IN;

  3. IS NULL 或者IS NOT NULL;
  4. 逻辑运算符AND, OR, NOT;

字面量类型包括:

  1. 数值,比如123, 3.1415;
  2. 字符,比如'abc',必须包围在单引号之间;
  3. NULL,特殊常量
  4. 布尔常量,TRUEFALSE

生产者代码:

public class Main {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("daltao");
        producer.setNamesrvAddr("192.168.1.6:9876");
        producer.setRetryTimesWhenSendFailed(0);
        producer.start();

        String topic = "TopicTest";
        Charset charset = Charset.forName("utf8");
        Message msg1 = new Message(topic, "SqlTest", "OrderID001", "Hello RocketMQ-1".getBytes(charset));
        msg1.putUserProperty("sequence", "1");
        msg1.putUserProperty("tag", "SqlTest");
        Message msg2 = new Message(topic, "SqlTest", "OrderID002", "Hello RocketMQ-2".getBytes(charset));
        msg2.putUserProperty("sequence", "2");
        msg2.putUserProperty("tag", "SqlTest");
        Message msg3 = new Message(topic, "SqlTest", "OrderID003", "Hello RocketMQ-3".getBytes(charset));
        msg3.putUserProperty("sequence", "3");
        msg3.putUserProperty("tag", "SqlTest");
        List<Message> messageList = Arrays.asList(
                msg1, msg2, msg3
        );
        producer.send(messageList);

        producer.shutdown();
    }
}

消费者代码:

public class Main {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("daltao");
        consumer.setNamesrvAddr("192.168.1.6:9876");
        consumer.subscribe("TopicTest", MessageSelector.bySql("sequence between 2 and 3 and tag='SqlTest'"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(MessageFormat.format("{0} received message - {1} ", Thread.currentThread().getName(), msgs.stream().map(x -> new String(x.getBody(), Charset.forName("utf8"))).collect(Collectors.toList())));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Start consuming");
    }
}

如果在执行的时候消费端报出 The broker does not support consumer to filter message by SQL92异常,你需要在broker.conf配置文件中增加enablePropertyFilter = true,在启动broker的时候用-c conf/broker.conf来指定使用的broker端配置文件

事务消息

在RocketMQ中支持事务消息,消息能与本地事务绑定,一起提交和回滚。由于可能在提交了本地事务但是在提交RocketMQ事务之前宕机,因此需要为RocketMQ提供一个回调接口用于查询事务是否执行成功。

特点:

  • 事务消息不能延迟发送
  • 事务消息不能批量发送
  • 事务级别消息默认会被回查15次,可以通过配置transactionCheckMax进行配置。如果超过限制还无法得到事务的正确状态,那么消息将被丢弃。
  • 如果本地事务执行过久,那么在transactionTimeout毫秒后将开始回查操作。
  • 事务级消息可能会被重复回查以及重复消费,需要业务端自己做控制。
  • RocketMQ根据生产者ID回查消息,比如producer-1发送了事务消息但是在提交之前宕机了,当它作为producer-1重新连接RocketMQ后,RocketMQ将从它哪里回查之前发送的消息的事务状态。

生产者:

public class Main {
    public static void main(String[] args) throws Exception {
        TransactionListener transactionListener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                System.out.println("SUCCESS");
                return LocalTransactionState.UNKNOW;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("CHECK " + new String(msg.getBody(), Charset.forName("utf8")));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        };

        TransactionMQProducer producer = new TransactionMQProducer("daltao");
        producer.setNamesrvAddr("192.168.1.6:9876");
        producer.setTransactionListener(transactionListener);
        producer.setExecutorService(Executors.newFixedThreadPool(4));
        producer.start();

        producer.sendMessageInTransaction(new Message("TopicTest", "transaction", ("1").getBytes(Charset.forName("utf8"))), null);
        producer.sendMessageInTransaction(new Message("TopicTest", "transaction", ("2").getBytes(Charset.forName("utf8"))), null);
        Thread.sleep(30000);
        producer.shutdown();
    }
}

消费者:

public class Main {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("daltao");
        consumer.setNamesrvAddr("192.168.1.6:9876");
        consumer.subscribe("TopicTest", "transaction");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(MessageFormat.format("{0} received message - {1} ", Thread.currentThread().getName(), msgs.stream().map(x -> new String(x.getBody(), Charset.forName("utf8"))).collect(Collectors.toList())));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Start consuming");
    }
}

架构设计

Apache RocketMQ是一个分布式消息和流平台,拥有低延迟,高性能和可靠性,万亿级容量和高灵活性等特性。它由四部分组成:name servers,brokers,producers和consumers。每一部分都可以水平扩展,无单点故障问题。

NameServer

NameServers提供轻量级的服务发现和路由。每一个NameServer都记录了全部的路由信息,提供对应的读写服务,并且支持快速存储扩大。

NameServer主要包含下面两个特性:

  • Broker管理,NameServer接受来自Broker集群的注册请求,并且提供了心跳检测机制用于检查broker是否存活。
  • 路由管理,每一个NameServer都会持有所有的关于broker集群和queue的信息,用于客户端查询。

众所周知,RocketMQ客户端会从NameServer查询队列路由信息,但是客户端如何找到NameServer的地址。有四种方法:

  1. 编程的方法,直接在代码中指定
  2. JVM系统属性,rocketmq.namesrv.addr
  3. 系统环境变量,NAMESRV_ADDR
  4. HTTP终端

Broker

Brokers通过提供轻量级的TOPIC和QUEUE机制来管理消息存储。它们支持PUSH和PULL模型,包含错误容忍机制(2份或3份副本)。Brokers还提供了容灾回复,多metrics统计数据,和报警机制,这些都是传统消息系统所缺失的。

Broker服务负责消息存储和递送,消息查询,高可用性保障等等。

Broker服务有一些重要模块:

  • 远程模块,broker的入口,处理来自客户端的请求
  • 客户端管理,管理客户端并维护消费者的主题订阅。
  • 存储服务,提供简单的API用于存储和查询在物理磁盘上的消息。
  • 高可用性(HA)服务,提供在主从节点之间的数据同步特性
  • 索引服务,通过指定的key为消息建立索引,并且提供快速消息查询。

Producer

Producers支持分布式部署,分布式Producers通过多重负载均衡模式向Broker集群发送消息。发送消息过程支持快速失败并且有着低延迟。

Consumer

Consumers支持使用PUSH和PULL模型,并且支持分布式部署。它也支持集群消费和消息广播。它提供了实时消息订阅机制并且能满足大多数消费者需求。RocketMQ的网站为感兴趣的用户提供了一个简单的quick-start教程。

概念

Producer

一个Producer发送由业务应用系统生成的消息到brokers。RocketMQ为发送提供了多种范例:同步,异步和单路。

Producer Group

相同角色的Producers被分在同一个组中。在发送普通消息时,仅作为标识使用。对于事务消息,如果某条消息的发送者在确定提交状态之前宕机,事务将一直处于UNKNOWN状态并超时,则brker会回查同一个group的其他producer,确认这条消息的具体提交状态。

Consumer

一个Consumer从brokers中拉取消息并且提供给应用程序。在用户程序的视角看来,消费者分成两种:

PullConsumer

Pull Consumer主动从brokers中拉取消息。一旦拉取到批量消息,用户程序就可以开始对消息进行处理。

PushConsumer

Push consumer, 在另一方面,封装了拉取消息,消费消息的过程,并且还在其内部维护了其他工作,仅留出了一个回调接口供终端用户去实现,以决定当消息抵达时具体的行为。

Consumer Group

类似于之前提到的producer group。相同角色的消费者被分到同一个组中。

Consumer Group是一个很大的概念,实现了负载均衡和容错的目标,而对于消费消费过程,却又是及其容易。

警告:相同分组的消费者必须订阅相同的TOPIC。

Topic

Topic是一个分类,producers向其投递消息,consumers从中拉取消息。Topic与producers和consumers维持很松的关联(耦合低)。特别的,可以有任意数目producers发送消息给同一个topic,对应的,一个producer也可以同时向多个topic发送消息。在consumer的视角,一个topic可能被任意数目consumer group所订阅。同样的,对于一个consumer group,可以同时订阅一个或更多的topics,只要这个group能保持它们的订阅一致即可。

Message

Message是被递送的信息。一个消息一定有一个Topic,Topic可以被认作是你的信件的收件地址。一个消息也可以有一个可选的tag和额外的键值对。比如你可以为你的消息设置一个业务key,并且在broker服务器上通过这个key查找message,从而在开发过程中可以进行调试诊断。

Message Queue

Topic可以被细分成多个子topic—消息队列。

Tag

Tag,换言之就是子topic,为用户提供额外的灵活性。带着tag,来自相同业务模块的不同目的的消息允许拥有相同的topic但是不同的tag。Tags有助于帮助你保持代码的清洁和连贯性,并且tag还能增强RocketMQ提供的查询系统的功能。

Broker

Broker是RocketMQ系统的主要模块。它接收来自producer的消息,保存它们并处理来自consumer的pull请求。它也会存储消息关联的元数据,包含消费者groups,消费过程的offset和topic/queue信息。

Name Server

Name server作为路由信息提供者向外部提供服务。Producer/Consumer客户端通过Name server和主题查找对应的broker列表。

Message Model

  • 集群
  • 广播

Message Order

当采用DefaultMQPushConsumer,你可以决定是有序消费还是并发消费。

  • 有序

对于每一个消息队列,有序消费消息意味着消息被消费的顺序与它们被producer发送到该队列的顺序一致。如果你处理的场景强制要求TOPIC全局有序,确保你是用的TOPIC只有一个消息队列。

警告:如果指定了有序消费,消费消息的最大并发为被消费者群组所订阅的消息队列数。

  • 并发

当并发地消费消息时,消费消息的最大并发仅被每个客户端的线程池的大小所限制。

警告:在当前模式,消息顺序不再被保证。

部署

Broker

Role

Broker角色分为ASYNC_MASTER、SYNC_MASTER或SLAVE。如果你无法容忍消息丢失,我们建议你部署SYNC_MASTER并且为它追加一个SLAVE。但是如果你认为丢失也可以接受,并且如果你希望Broker总是可用,你可以部署ASYNC_MASTER以及SLAVE。

FlushDiskType

ASYNC_FLUSH是推荐的方式,而SYNC_FLUSH是昂贵的刷盘方式并且会造成性能降低。如果你想要可靠性,我们推荐你使用SYNC_MASTER加上SLAVE。

Producer

SendStatus

当发送一条消息,你将会得到一个SendResult,其中包含了SendStatus。首先我们假设消息的isWaitStoreMsgOK=true(默认值为true)。否则我们将在没有异常抛出的情况下总是得到SEND_OK。下面描述每一个状态:

FLUSH_DISK_TIMEOUT

如果Broker设置MessageStoreConfig的FlushDiskType=SYNC_FLUSH(默认是ASYNC_FLUSH),并且Broker没能在MessageStoreConfig的syncFlushTimeout(默认是5秒),你将会得到该状态。

FLUSH_SLAVE_TIMEOUT

如果Broker的角色是SYNC_MASTER(默认是ASYNC_MASTER),并且slave Broker无法在MessageStoreConfig的syncFlushTimeOut(默认是5秒)内完成与master的同步,你将得到这个状态。

SLAVE_NOT_AVAILABLE

如果Broker的角色是SYNC_MASTER(默认是ASYNC_MASTER),但是没有配置slave broker,你将得到这个状态。

SEND_OK

SEND_OK并不意味它是可靠的,要保证没有消息丢失,你应该同时启动SYNC_MASTER或者SYNC_FLUSH。

重复或丢失

如果你得到FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT同时broker宕机了,你可能会发现消息丢失了。此时,你有两个选择,一个是放过它,之后消息永久丢失。另一种方式是重新发送消息,当然消息可能会发生重复。通常我们建议重复发送消息并且找到一个处理重复消费的方法。除非你认为丢失一部分消息无伤大雅。但是要记住当你得到SLAVE_NOT_AVAILABLE时重复发送的用处很小,你应该保留该场景并通知集群管理员。

Timeout

客户端发送请求到broker ,之后等待相响应,但是如果等待超过了最大等待时间并且没有响应返回,客户端将抛出一个RemotingTimeoutException。默认等待时间时3秒。你也可以通过调用send(msg, timeout)显式指定超时时间。注意我们不建议timeout太小,因为broker需要一定的时间落盘以及与slave同步。当然如果该值超过syncFlushTimeout过多也没有意义,因为broker可能在timeout之前返回一个带着FLUSH_SLAVE_TIMEOUT或FLUSH_SLAVE_TIMEOUT状态的响应。

消息大小

我们建议消息的大小应该不超过512K。

异步发送

默认send方法会阻塞直到得到响应或超时,所以如果你看重性能表现,我们推荐使用send(msg, callback),它会使用异步的方式处理响应。

Producer群组

通常,producer群组是无效的。但是如果你被卷入事务中,你就应该关注它。默认,在相同的JVM中你最多只能为一个群组提供一个producer,这通常是足够的。

线程安全性

Producer是线程安全的,你可以直接将其作为业务解决方案。

性能表现

如果你希望在一个JVM上构建多个producer,用于处理大数据量,我们建议:

  • 用较少的producers,使用异步发送的方式发送
  • 为每个producer调用setInstanceName

Consumer

Consumer分组和订阅

你首先应该意识到不同的consumer群组能独立地消费同一个Topic,并且它们都会有自己独立的消费offset。请确保相同群组的consumer订阅了相同的topics。

MessageListener

Orderly

消费者将会锁住每一个消息队列以确保消息的消费顺序是一个接一个,是有序的。这将会带来性能损耗,但是在你重视消息顺序时相当有用。抛出异常不是一种推荐的方式,你应该取而代之返回ConsumerOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT。

Concurrently

就像名字所暗示的,消费者会并发地消费消息。这是推荐的获得较好性能表现的方式,同样不推荐抛出异常的方式,你应该返回ConsumerConcurrentlyStatus.RECONSUME_LATER。

Consume Status

对于MessageListenerConcurrently,你可以返回RECONSUME_LATER,表示你现在无法消费它,希望之后能进行重新消费。这样你就可以继续消费其它的消息。对于MessageListenerOrderly,由于你关注顺序性,你不能直接跳过个别消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT以通知consumer去稍等片刻。

阻塞

不推荐阻塞Listener,因为这样会阻塞线程池,最终可能导致消费过程的退出。

线程数

Consumer内部使用ThreadPoolExecutor在内部处理消费,你可以通过调用setConsumerThreadMin或setConsumeThreadMax来改变它。

ConsumeFromWhere

当建议了一个新的消费群组,它将需要决定是否要消费broker中残留的历史消息。CONSUME_FROM_LAST_OFFSET将会忽略历史信息,仅消费之后生产的消息。CONSUME_FROM_FIRST_OFFSET将会消费broker中所有的消息。你也可以使用CONSUME_FROM_TIMESTAMP来消费在某个特定时间点之后生产的消息。

重复

许多场景都可能导致重复,比如:

  • Producer重复发送消息
  • 消费者宕机导致一些broker中的offset没有及时更新。

因此假如你不能容忍消息重复,你就需要做一些外部工作来处理重复消息。比如说,检查你数据库中的主键。

NameServer

在Apache RocketMQ中,name servers被设计用来协调分布式系统中的组件,协调工作主要是通过组织topic路由信息实现的。

管理内容分为两部分:

  • Brokers周期性更换保存在每个name server中的元数据。
  • Name servers向客户端(包括消费者,生产者)提供最新的路由信息。