RocketMq基础
前言
最近,由于业务发展,项目准备引入消息中间件。经过一系列讨论,在加上我们业务偏金融属性,最终选择RocketMq作为消息中间件。所以,趁着有空,学习下如何使用。
架构
这个是从官方文档扣的图,可以看出主要分为四个模块,具体如下:
Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟
Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式(本质上都是拉模型)对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。NameServer通常也是集群的方式部署,
各实例间相互不进行信息通讯
(意思是每个实例存的都是全部的信息)BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块:
- Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
- Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
- Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
- HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能
- Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询
消息发送流程
从上面可以看出消息的流转过程:producer->broker->comsumer,下面我就大概分析下消息的传导过程:
生产者(producer)
生产者新建消息,并通过网络发送到broker,这个过程有三种发送的方式,具体如下:
1.同步发送
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//初始化生产者组
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
//设置namespace server地址
producer.setNamesrvAddr("127.0.0.1:9876");
//设置重试次数
producer.setRetryTimesWhenSendFailed(3);
producer.start();
for (int i = 0; i < 1000; i++) {
try {
//创建一个消息实例
Message msg = new Message("TopicTest" /* 消息主题 */,
"TagA" /* 消息标签 */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
);
//同步发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (UnsupportedEncodingException e){
e.printStackTrace();
} catch (MQClientException e){
e.printStackTrace();
} catch (RemotingException e){
e.printStackTrace();
} catch(MQBrokerException e){
e.printStackTrace();
} catch (InterruptedException e){
e.printStackTrace();
}
}
producer.shutdown();
}
}
send
是一个同步操作,只要不抛出异常,代表消息发送成功
。抛出异常,代表发送动作失败,需要对异常消息进行处理,避免消息丢失
发送成功
只代表broker收到了消息,broker有没有保存对于producer是不确定的。因此,broker在不同的配置下,会返回不同的响应状态:
SendStatus.SEND_OK:
消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。SendStatus.FLUSH_DISK_TIMEOUT:
消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。SendStatus.FLUSH_SLAVE_TIMEOUT:
消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。SendStatus.SLAVE_NOT_AVAILABLE:
消息发送成功,但是此时Slave不可用。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。
2.异步发送
public class AsyncProducer {
public static void main(
String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.start();
//设置重试次数
producer.setRetryTimesWhenSendAsyncFailed(3);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("Jodie_topic_1023",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//异步发送消息,主线程不会被阻塞,会立即返回
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//消息发送成功
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
//消息发送失败,可以持久化这条数据,进行补偿
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
异步发送消息
一定要注意重写回调方法,在回调钟对消息结果进行相应的处理。
3.单向发送
public class OnewayProducer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
producer.sendOneway(msg);
}
Thread.sleep(5000);
producer.shutdown();
}
}
单向消息
主要用于消息中等可靠性的情况,比如日志发送等场景。
存储(broker)
broker作用是接受从生产者发送的消息,并持久化消息,然后返回响应给producer。
在持久化的过程中,为了消息落盘过程的不丢失和性能,提供了两种刷盘策略:
- 异步刷盘:默认的策略,消息可能只是被写入了内存的PAGECACHE,写操作的返回快
吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入 - 同步刷盘:消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。相比异步刷盘会损失性能,但是更加可靠
消费者(consumer)
消费者主要作用是从broker中拉取消息,然后进行消费(执行特定业务),具体代码如下:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
//标记该消息稍后重新消费
//return ConsumeConcurrentlyStatus.RECONSUME_LATER
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
注意
,只有业务执行成功才返回CONSUME_SUCCESS
,否则返回RECONSUME_LATER
,代表重试消费消息。
目前,我们知道了消息的流转,以及整体架构。下篇文章,我们详细聊聊消息的存储问题。