RocketMq基础(一)

RocketMq基础

前言

最近,由于业务发展,项目准备引入消息中间件。经过一系列讨论,在加上我们业务偏金融属性,最终选择RocketMq作为消息中间件。所以,趁着有空,学习下如何使用。

架构

rocketmq架构图

这个是从官方文档扣的图,可以看出主要分为四个模块,具体如下:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟

  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式(本质上都是拉模型)对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯(意思是每个实例存的都是全部的信息)

  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块:

    1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
    2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
    3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能
    5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询

rocketmq_architecture_2

消息发送流程

从上面可以看出消息的流转过程:producer->broker->comsumer,下面我就大概分析下消息的传导过程:

生产者(producer)

生产者新建消息,并通过网络发送到broker,这个过程有三种发送的方式,具体如下:

1.同步发送

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //初始化生产者组
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        //设置namespace server地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //设置重试次数
        producer.setRetryTimesWhenSendFailed(3);
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {
                //创建一个消息实例
                Message msg = new Message("TopicTest" /* 消息主题 */,
                    "TagA" /* 消息标签 */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
                );
                //同步发送消息
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (UnsupportedEncodingException e){
                e.printStackTrace();
            } catch (MQClientException e){
                e.printStackTrace();
            } catch (RemotingException e){
                e.printStackTrace();
            } catch(MQBrokerException e){
                e.printStackTrace();
            } catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

send是一个同步操作,只要不抛出异常,代表消息发送成功。抛出异常,代表发送动作失败,需要对异常消息进行处理,避免消息丢失

发送成功只代表broker收到了消息,broker有没有保存对于producer是不确定的。因此,broker在不同的配置下,会返回不同的响应状态:

  • SendStatus.SEND_OK:
    消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。

  • SendStatus.FLUSH_DISK_TIMEOUT:
    消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。

  • SendStatus.FLUSH_SLAVE_TIMEOUT:
    消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。

  • SendStatus.SLAVE_NOT_AVAILABLE:
    消息发送成功,但是此时Slave不可用。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。

2.异步发送

public class AsyncProducer {
    public static void main(
        String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
        DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
        producer.start();
        //设置重试次数
        producer.setRetryTimesWhenSendAsyncFailed(3);
        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("Jodie_topic_1023",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                //异步发送消息,主线程不会被阻塞,会立即返回    
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        //消息发送成功
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        //消息发送失败,可以持久化这条数据,进行补偿
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

异步发送消息一定要注意重写回调方法,在回调钟对消息结果进行相应的处理。

3.单向发送

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message msg = new Message("TopicTest",
                "TagA",
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) 
            );
            producer.sendOneway(msg);
        }
        Thread.sleep(5000);        
        producer.shutdown();
    }
}

单向消息主要用于消息中等可靠性的情况,比如日志发送等场景。

存储(broker)

broker作用是接受从生产者发送的消息,并持久化消息,然后返回响应给producer。

在持久化的过程中,为了消息落盘过程的不丢失和性能,提供了两种刷盘策略:

  • 异步刷盘:默认的策略,消息可能只是被写入了内存的PAGECACHE,写操作的返回快
    吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入
  • 同步刷盘:消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。相比异步刷盘会损失性能,但是更加可靠

消费者(consumer)

消费者主要作用是从broker中拉取消息,然后进行消费(执行特定业务),具体代码如下:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        // 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("TopicTest", "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                //标记该消息稍后重新消费
                //return  ConsumeConcurrentlyStatus.RECONSUME_LATER
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

注意,只有业务执行成功才返回CONSUME_SUCCESS,否则返回RECONSUME_LATER,代表重试消费消息。

目前,我们知道了消息的流转,以及整体架构。下篇文章,我们详细聊聊消息的存储问题。