文章目录
前言
RocketMQ的启动主要涉及NameServer、Broker、Producer、Consumer的启动。
这里会简单介绍下这些组件的启动流程,但会捡我认为比较重要的说。
一、NameServer启动流程
NamerServer主要维护Broker的更新和给生产者消费者提供Broker的地址、告诉生产者消费者需不需要更新broker列表缓存。
NamerServer启动分为两步,创建NameSrvController和启动NameSrvController
1.创建NameSrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
- namesrvConfig:NamerServer自身的配置
- nettyServerConfig:NamerServer作为一个Netty服务端,需要的一些配置,例如在该配置中指定了服务端口是
9876
实际上在传入配置后,初始化一些其他的服务类,例如routeInfoManager路由信息管理器
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
2.初始化NameSrvController
public boolean initialize() {
this.kvConfigManager.load();
// 初始化netty服务器
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 设置netty服务工作线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
// 定时任务,每10s检查并移除不活跃的broker
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
// 定时任务,每10s打印一下kv配置
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);
// 此处省略其他的一些代码
}
return true;
}
3.启动NameSrvController
启动NameSrvController主要是通过以下方法
public void start() throws Exception {
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
- 启动了一个nettyServer
- 启动了一个文件监听服务,这样不需要重启服务,就能够实现热配置。
二、Broker 启动流程
Broker启动分为两步,创建BrokerController和启动BrokerController
创建BrokerController
在创建BrokerController的初始阶段,需先解析一些命令参数,例如-c,指定配置文件broker.conf文件,读取进来
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
创建BrokerController主要是传递配置,包括Broker的配置、Netty服务端和客户端的配置、消息存储的一些配置。
需要注意的是这里Netty服务端好理解,主要用于接收生产者传递的消息,Netty客户端则主要是用于事务消息,需要以client的方式,与生产者进行交互。
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
初始化、启动BrokerController
这里不大片贴代码了,主要是启动了很多定时任务和初始化了很多线程池:
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.putThreadPoolQueue,
new ThreadFactoryImpl("PutMessageThread_"));
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_"));
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));
this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));
this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true));
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
三、消费者消费流程
我们先以PushConsumer来做分析,即推模式的消费者。
在ClientRemotingProcessor类中,消费者会对来的netty请求做分类,这些请求包括检检查事务、通知消费者id变了、消息等类型的请求。
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
return this.notifyConsumerIdsChanged(ctx, request);
case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
return this.resetOffset(ctx, request);
case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
return this.getConsumeStatus(ctx, request);
case RequestCode.GET_CONSUMER_RUNNING_INFO:
return this.getConsumerRunningInfo(ctx, request);
// 重点关注下这里的消息
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:
return this.receiveReplyMessage(ctx, request);
default:
break;
}
return null;
}
consumeMessageDirectly是一个接口方法,实现有无序消息和有序消息消费两种:

下面我贴出来代码,其实会发现,就是消费消息,并且根据消费的结果,给broker返回状态值。
public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
// 无序消息,orderly就是false
result.setOrder(false);
// 默认自动提交offset
result.setAutoCommit(true);
List<MessageExt> msgs = new ArrayList<MessageExt>();
msgs.add(msg);
MessageQueue mq = new MessageQueue();
mq.setBrokerName(brokerName);
mq.setTopic(msg.getTopic());
mq.setQueueId(msg.getQueueId());
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(mq);
this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.consumerGroup);
final long beginTime = System.currentTimeMillis();
log.info("consumeMessageDirectly receive new message: {}", msg);
try {
// 消费消息并返回根据消费结果,设置该消息的最终状态
ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context);
if (status != null) {
switch (status) {
case CONSUME_SUCCESS:
result.setConsumeResult(CMResult.CR_SUCCESS);
break;
case RECONSUME_LATER:
result.setConsumeResult(CMResult.CR_LATER);
break;
default:
break;
}
} else {
result.setConsumeResult(CMResult.CR_RETURN_NULL);
}
} catch (Throwable e) {
result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
mq), e);
}
result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
log.info("consumeMessageDirectly Result: {}", result);
return result;
}
整体看来,像是推过来的netty请求,其中带着Broker发来的相关指令,而这些指令中就带着一些需要消费的消息。
接着来梳理下拉模式的消费者,RocketMQ现在具体实现的类是DefaultLitePullConsumer,原有的DefaultPullConsumer类已经过时,并且将在2022年移除。
RocketMQ服务启动详解:NameServer、Broker与消费者消费流程
本文详细介绍了RocketMQ启动过程中的NameServer创建、初始化与启动,Broker控制器创建及启动,以及消费者消费流程,特别是PushConsumer的消费逻辑。
301

被折叠的 条评论
为什么被折叠?



