# roketmq背景
- 详情查看工具-mq-rocketmq章节
# 通信基础设施设计
# 服务端
/*xxx: 远程服务*/
public interface RemotingService {
//xxx: 启动服务
void start();
//xxx: 关闭服务
void shutdown();
void registerRPCHook(RPCHook rpcHook);
/**
* Remove all rpc hooks.
*/
void clearRPCHook();
}
/*xxx: rocketMQ的一个网络通信模块,是实现高并发,高性能的关键之一*/
//xxx: 其作用有: 处理网络请求,处理网络响应,处理网络连接,维护处理的线程池
//xxx: 用于支持远程计算的服务器
public interface RemotingServer extends RemotingService {
//xxx: 绑定处理器
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
/*xxx: 获取本地监听端口*/
int localListenPort();
//xxx: 创建远程服务实例
RemotingServer newRemotingServer(int port);
/*xxx: 同步执行,执行结果同步返回*/
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
/*xxx: 异步执行,执行结果通过异步回调的方式返回*/
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
/*xxx: 单步执行, 即客户端调用之后,不关心服务是否成功或者失败*/
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
}
# 客户端
/*xxx: 远程服务*/
public interface RemotingService {
//xxx: 启动服务
void start();
//xxx: 关闭服务
void shutdown();
void registerRPCHook(RPCHook rpcHook);
/**
* Remove all rpc hooks.
*/
void clearRPCHook();
}
/*xxx: 用于向 broker发送网络请求, 实现消息的发送,消费和管理的功能 , 同时负责接收 来自broker的响应 */
//xxx: 用于支持远程计算的客户端组件
public interface RemotingClient extends RemotingService {
/*xxx: 获取nameServer的地址*/
List<String> getNameServerAddressList();
/*xxx: 获取活跃的namesrv服务列表*/
List<String> getAvailableNameSrvList();
/*xxx: 同步调用*/
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
/*xxx: 异步调用*/
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
/*xxx:单向调用 */
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
/*xxx: 绑定处理器*/
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
/*xxx: 关闭信道 */
void closeChannels(final List<String> addrList);
}
# 通信编码
/*xxx: 关于 rocketmq的所有操作请求码 定义 */
/*xxx: 目前定义了137种操作 */
public class RequestCode {
/*xxx: 发送消息*/
public static final int SEND_MESSAGE = 10;
/*xxx: 拉取消息*/
public static final int PULL_MESSAGE = 11;
/*xxx: 查询消息*/
public static final int QUERY_MESSAGE = 12;
public static final int QUERY_BROKER_OFFSET = 13;
/*xxx: 查询消费者位移 */
public static final int QUERY_CONSUMER_OFFSET = 14;
/*xxx: 更新消费者位移 */
public static final int UPDATE_CONSUMER_OFFSET = 15;
/*xxx: 创建或更新 topic命令 */
public static final int UPDATE_AND_CREATE_TOPIC = 17;
/*xxx: 获取所有的 topic信息*/
public static final int GET_ALL_TOPIC_CONFIG = 21;
//xxx: 省略其它抽象...
/*XXX: 客户端心跳, 完成了生产者,消费者的注册*/
public static final int HEART_BEAT = 34;
/*xxx: 完成了 客户端的下线, 包括 生产者,消费者的下线 */
public static final int UNREGISTER_CLIENT = 35;
/*xxx: 获取分组中的消费者 */
public static final int GET_CONSUMER_LIST_BY_GROUP = 38;
}
# 通信处理器
# namesrv-服务端
public class NamesrvController {
private void registerProcessor() {
/*xxx: 初始化客户端请求处理器,并进行绑定*/
ClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);
/*xxx: 将客户端请求处理器 与 服务器 相绑定 */
this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);
/*xxx: 绑定 默认处理器*/
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);
}
}
# 客户端请求处理器
/*xxx: 客户端请求处理器 */
public class ClientRequestProcessor implements NettyRequestProcessor {
@Override
/*xxx: 处理请求 */
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
final RemotingCommand request) throws Exception {
return this.getRouteInfoByTopic(ctx, request);
}
/*xxx: 通过topic获取broker路由信息,包括消息队列 */
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
/*xxx: 创建远程计算响应 */
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
/*xxx: 解析请求头 */
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
/*xxx: 通过topic获取 路由数据 */
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
byte[] content = topicRouteData.encode();
/*xxx: 将路由数据进行返回 */
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}
# 默认请求处理器
public class DefaultRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return this.queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
return this.registerBroker(ctx, request);
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.BROKER_HEARTBEAT:
return this.brokerHeartbeat(ctx, request);
case RequestCode.GET_BROKER_MEMBER_GROUP:
return this.getBrokerMemberGroup(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.ADD_WRITE_PERM_OF_BROKER:
return this.addWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return this.getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return this.deleteTopicInNamesrv(ctx, request);
case RequestCode.REGISTER_TOPIC_IN_NAMESRV:
return this.registerTopicToNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
case RequestCode.GET_CLIENT_CONFIG:
return this.getClientConfigs(ctx, request);
default:
String error = " request type " + request.getCode() + " not supported";
return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
}
//xxx: 省略其它抽象...
}
}
# namesrv-客户端
/*xxx: namesrv控制器, namesrv的核心组件 */
/*xxx: 该组件存在核心的声明周期方法: 初始化, 启动,以及 关闭*/
public class NamesrvController {
//xxx: 启动namesrv控制器 (启动联网组件: remotingServer, remotingClient, )
public void start() throws Exception {
this.remotingClient.updateNameServerAddressList(Collections.singletonList(NetworkUtil.getLocalAddress()
+ ":" + nettyServerConfig.getListenPort()));
//xxx: 启动客户端服务
this.remotingClient.start();
}
}
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
@Override
public void start() {
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class);
//xxx: 省略其它抽象...
TimerTask timerTaskScanResponseTable = new TimerTask() {
@Override
public void run(Timeout timeout) {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
LOGGER.error("scanResponseTable exception", e);
} finally {
/*xxx: 每隔1秒钟, 扫描并执行异步请求的 响应回调 */
timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS);
}
}
};
/*xxx: 3秒钟后, 执行响应回调 */
this.timer.newTimeout(timerTaskScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS);
int connectTimeoutMillis = this.nettyClientConfig.getConnectTimeoutMillis();
TimerTask timerTaskScanAvailableNameSrv = new TimerTask() {
@Override
public void run(Timeout timeout) {
try {
NettyRemotingClient.this.scanAvailableNameSrv();
} catch (Exception e) {
LOGGER.error("scanAvailableNameSrv exception", e);
} finally {
/*xxx: 每隔3秒钟,检测 namesrv的可用性 并进行失活处理 */
timer.newTimeout(this, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
}
};
/*xxx: 每隔3秒钟, */
this.timer.newTimeout(timerTaskScanAvailableNameSrv, 0, TimeUnit.MILLISECONDS);
}
}
# broker-服务端
public class BrokerController {
public void registerProcessor() {
/*xxx: 发送消息钩子*/
sendMessageProcessor.registerSendMessageHook(sendMessageHookList);
//xxx: 消费消息钩子
sendMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
//xxx: 将发送消息处理器 与 服务进行绑定
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
/*xxx: 拉取消息处理器 与 服务端继续宁绑定 */
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE, this.pullMessageProcessor, this.litePullMessageExecutor);
/*xxx: 消费消息钩子 */
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/*xxx: peek消息处理器*/
this.remotingServer.registerProcessor(RequestCode.PEEK_MESSAGE, this.peekMessageProcessor, this.pullMessageExecutor);
/*xxx: pop消息处理器 */
this.remotingServer.registerProcessor(RequestCode.POP_MESSAGE, this.popMessageProcessor, this.pullMessageExecutor);
/*xxx: 消息确认处理器 */
this.remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
/*xxx: 消息重新消费处理器 */
this.remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
/*xxx: 绑定 通知处理器 */
this.remotingServer.registerProcessor(RequestCode.NOTIFICATION, this.notificationProcessor, this.pullMessageExecutor);
/*xxx: poll方式消费消息处理器 */
this.remotingServer.registerProcessor(RequestCode.POLLING_INFO, this.pollingInfoProcessor, this.pullMessageExecutor);
/*xxx: 绑定 响应消息处理器 */
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
/*xxx: 绑定查询消息处理器 */
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
/*xxx: 绑定客户端管理处理器 */
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
/*xxx: 绑定消费者管理处理器 */
ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
/*xxx: 绑定 负载均衡处理器 */
this.remotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor);
this.remotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor);
/*xxx: 绑定 事务处理器 */
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);
/*xxx: 默认处理器*/
AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}
}
# 发送消息处理器
/*xxx: 发送消息处理器 */
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext sendMessageContext;
switch (request.getCode()) {
/*xxx: 消费端用于将消费失败的消息重新发送到Broker*/
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
/*xxx: 从 主题队列映射管理器中,获取主题队列映射信息 */
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true);
RemotingCommand response;
/*xxx: 调用发送消息的方法 */
/*xxx: 发送完成后,会执行发送消息的后置钩子函数 */
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
(ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1));
} else {
response = this.sendMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
(ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
}
return response;
}
}
/*xxx: 实际发送的方法 */
public RemotingCommand sendMessage(){
//xxx: 省略其它抽象...
}
}
# 拉取消息处理器
/*xxx: 拉取消息处理器 */
public class PullMessageProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
/*xxx: 处理 拉取消息请求 */
return this.processRequest(ctx.channel(), request, true);
}
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
/*xxx: 创建响应对象 */
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
/*xxx: 序列化请求头 */
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
/*xxx: 获取消费者分组的信息, 包括 分组名称、消费模式(并发消费、顺序消费)、消费者启动时是否从尾部消费、消费者超时时间、消费者最大重试次数、消息模式(集群消息,广播消息) */
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
/*xxx: 获取主题配置信息 */
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
{
/*xxx: 重写静态主题的消息请求, 对于这一类主题, 消息生产者可以直接发送消息到该主题下的指定队列中,而无需进行路由选择*/
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
}
/*xxx: 获取消息存储器 */
final MessageStore messageStore = brokerController.getMessageStore();
/*xxx: 获取队列id */
int queueId = requestHeader.getQueueId();
/*xxx: 查询指定消息队列的消费进度, 并将消费进度重置为指定的位置 */
Long resetOffset = brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topic, group, queueId);
//xxx: 省略其它抽象...
/*xxx: 通过 拉取消息结果处理器,进行处理 */
return this.pullMessageResultHandler.handle(
getMessageResult,
request,
requestHeader,
channel,
subscriptionData,
subscriptionGroupConfig,
brokerAllowSuspend,
messageFilter,
response,
mappingContext
);
}
}
# 客户端实例管理处理器
/*xxx: 客户端管理处理器 */
public class ClientManageProcessor implements NettyRequestProcessor {
@Override
/*xxx: 客户端管理处理器 */
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
/*xxx: 心跳请求,会 根据数据信息,设置生产者 或者消费者 */
case RequestCode.HEART_BEAT:
return this.heartBeat(ctx, request);
/*xxx: 注销客户端 */
case RequestCode.UNREGISTER_CLIENT:
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
/*xxx: 检查客户端配置 */
return this.checkClientConfig(ctx, request);
default:
break;
}
return null;
}
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
);
/*xxx: 如果心跳包中,包含了 consuerData信息,则注册为 消费者*/
for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
/*xxx: 消费者,需要配置 消费分组信息 , 消费类型, 消息模型, 消费的起点,订阅消息*/
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
consumerData.getGroupName(),
clientChannelInfo,
consumerData.getConsumeType(),
consumerData.getMessageModel(),
consumerData.getConsumeFromWhere(),
consumerData.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
}
/*xxx: 如果心跳包里面,包含了 producer数据,则注册为生产者 */
for (ProducerData data : heartbeatData.getProducerDataSet()) {
/*xxx: 生产者,包含 组名即可 */
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
clientChannelInfo);
}
}
}
# more
此处省略
# broker-客户端
- 未直接使用
客户端
组件,主要用作服务端
# mqClientInstance-服务端
- 未用作
服务端
角色,主要用于客户端
# mqClientInstance-客户端
public class MQClientAPIImpl implements NameServerUpdateCallback {
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
/*xxx: 同步发送操作 */
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
/*xxx: 执行同步操作 */
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response, addr);
}
/*xxx: 拉取消息 */
public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request;
if (PullSysFlag.hasLitePullFlag(requestHeader.getSysFlag())) {
request = RemotingCommand.createRequestCommand(RequestCode.LITE_PULL_MESSAGE, requestHeader);
} else {
/*xxx: 默认情况下, 使用标准的 pull_message 获取消息 */
request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
}
switch (communicationMode) {
/*xxx: 通信类型,分为同步,异步, 单步*/
case ONEWAY:
/*xxx: 单步获取消息的方式,禁止*/
assert false;
return null;
case ASYNC:
/*xxx: 异步获取消息*/
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
/*xxx: 同步获取消息 */
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
}
# 客户端远程处理
public class ClientRemotingProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
return this.notifyConsumerIdsChanged(ctx, request);
case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
return this.resetOffset(ctx, request);
case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
return this.getConsumeStatus(ctx, request);
case RequestCode.GET_CONSUMER_RUNNING_INFO:
return this.getConsumerRunningInfo(ctx, request);
/*xxx: 直接消费消息, 由broker节点触发*/
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:
return this.receiveReplyMessage(ctx, request);
default:
break;
}
return null;
}
}
# 消息模型基础设施
# MappedFile消息文件
- 用来进行文件内存映射的类
- 将一个文件映射到内存中的一段连续的地址空间,使得可以通过内存的方式读写文件,从而提高了消息读写的效率
- 其文件的命名规则为:
00000001000000000000
00100000000000000000
09000000000020000000
二十位数字组成,其中fileName[n] = fileName[n - 1] + n * mappedFileSize
;fileName[0] = startOffset - (startOffset % this.mappedFileSize)
- 当前Broker中的所有消息都是落盘到mappedFile文件中,其大小默认为1G(小于等于1G),文件名由20位十进制数构成,表示当前文件的第一条消息的起始位移偏移量
- 无论当前Broker中,存放着多少Topic的消息,都被顺序写入到mappedFile文件中的
//xxx: 将文件映射到内存中的过程就是将文件中的每一个字节都映射到了内存的一段连续的地址空间
public interface MappedFile {
//xxx: 获取文件名
String getFileName();
/*xxx: 获取文件大小 */
int getFileSize();
//xxx: 同步追加消息
AppendMessageResult appendMessages(MessageExtBatch message, AppendMessageCallback messageCallback, PutMessageContext putMessageContext);
/*xxx: 刷盘 */
int flush(int flushLeastPages);
//xxx: 获取文件
File getFile();
}
# 消息单元
- mappedFile文件内容,由一个个的消息单元构成;
- 每个消息单元,包含消息总长度
msgLen
,消息物理位置physicalOffset
,消息体内容body
,消息体长度bodyLength
,消息主题topic
,topic长度topicLength
,消息生产者bornHost
,消息发送时间戳bornTimestamp
,消息所在队列QueueId,消息所在queue中存储的偏移量QueueOffset的 属性 - 下方的文件,仅展示了其需要存放的核心方法
class CommitLog{
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
int pos = 4 + 4 + 4 + 4 + 4;
// 6 QUEUEOFFSET
preEncodeBuffer.putLong(pos, queueOffset);
pos += 8;
// 7 PHYSICALOFFSET
preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
pos += 8 + 4 + 8 + ipLen;
// refresh store time stamp in lock
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
byteBuffer.put(preEncodeBuffer);
}
}
# commitLog存储
- commitlog是一个目录,里面存放着很多mappedFile文件;
- 一个broker中,仅包含一个commitlog目录;
# consumeQueue
- consumequeue文件是commitlog的索引文件,可以根据consumequeue定位到具体的信息;
- 每个topic会在
~/store/consumequeue
目录中,创建一个目录,目录名为topic名称,在该topic目录下,会再为每个该topic的queue
建立一个目录,目录名为queueId; - 每个consumequeue文件,可以包含20w个索引条目,每个索引条目包含三个重要属性:
- 消息在mappedFile文件中的偏移量(8字节)
- 消息长度(4字节)
- 消息tag的hashcode值(8字节)
- 一个consumequeue文件中的所有消息的topic一定是相同的,但是每条消息的tag可能是不同的;
public interface ConsumeQueueInterface extends FileQueueLifeCycle {
/*xxx: 获取主题*/
String getTopic();
/*xxx: 获取队列id*/
int getQueueId();
/*xxx: 获取索引条目*/
CqUnit get(long index);
/*xxx: 获取队列中的消息数量*/
long getMessageTotalInQueue();
/*xxx: 新增索引*/
void increaseQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum);
/*xxx: 获取队列中的最小索引*/
long getMinOffsetInQueue();
/*xxx: 获取队列中的最大索引*/
long getMaxOffsetInQueue();
}
/*xxx: 消费队列 用于存储消息消费进度 */
public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
@Override
public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg,
short messageNum) {
String topicQueueKey = getTopic() + "-" + getQueueId();
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
//xxx: 省略其它抽象...
}
}
public class QueueOffsetOperator {
private ConcurrentMap<String, Long> topicQueueTable = new ConcurrentHashMap<>(1024);
public void increaseQueueOffset(String topicQueueKey, short messageNum) {
Long queueOffset = ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> 0L);
topicQueueTable.put(topicQueueKey, queueOffset + messageNum);
}
@Override
public long getMessageTotalInQueue() {
return this.getMaxOffsetInQueue() - this.getMinOffsetInQueue();
}
}
# 消息发送过程
- broker根据queueId,获取到该消息对应索引条目要在
consumequeue
目录中的写入偏移量,即QueueOffset; - 将queueId,queueOffset等数据,与消息一起封装为消息单元;
- 将消息单元写入到commitlog;
- 形成消息
索引条目
; - 将消息索引条目分发到相应的consumequeue;
# 消息接收过程
- consumer获取到其要消费消息所在queue的
消费偏移量
,计算出其要消费消息的消息offset
;- 消费offset即消费进度,consumer对某个queue的消费offset,即消费到了该queue的第几条消息;
- consumer向Broker发送拉取请求,其中包含了要拉取消息的
queue
,消息offset
和消息tag
;
- broker计算在该consumequeue中的
queueOffset
;queueOffset=消息offsetset*20字节 - 从该queueOffset处开始,向后查找第一个指定tag的索引条目;
- 解析该索引条目的前8个字节,即可定位到该消息在commitlog中的commitlog offset;
- 从对应commitlog offset中读取消息单元,并发送给consumer;
# namesrv的源码流程
# 启动引导
/*xxx: namesrv的启动类 */
public class NamesrvStartup {
public static void main(String[] args) {
/*xxx: 启动namesrv控制器*/
main0(args);
/*xxx: 启动namesrv控制管理器 */
controllerManagerMain();
}
public static NamesrvController main0(String[] args) {
/*xxx: 解析配置 */
parseCommandlineAndConfigFile(args);
NamesrvController controller = createAndStartNamesrvController();
}
/*xxx: 启动namesrv控制器 */
public static NamesrvController createAndStartNamesrvController() throws Exception {
/*xxx: 创建namesrv控制器*/
NamesrvController controller = createNamesrvController();
//xxx: 启动控制器
start(controller);
/*xxx: 获取netty配置,并给出提示 */
NettyServerConfig serverConfig = controller.getNettyServerConfig();
}
/*xxx: 创建namesrv控制器 */
public static NamesrvController createNamesrvController() {
/*xxx: namesrv的实例化,依赖于: namesrv本身的配置, netty的配置, nettyClient配置*/
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig, nettyClientConfig); controller.getConfiguration().registerConfig(properties);
return controller;
}
/*xxx: 启动 namesrv控制器 */
public static NamesrvController start(final NamesrvController controller) throws Exception {
/*xxx: 容器初始化 */
boolean initResult = controller.initialize();
if (!initResult) {
/*xxx: 初始化失败,则退出程序 */
controller.shutdown();
System.exit(-3);
}
/*xxx: 启动控制器 */
controller.start();
}
}
# namesrv控制器初始化过程
/*xxx: namesrv控制器, namesrv的核心组件 */
/*xxx: 该组件存在核心的声明周期方法: 初始化, 启动,以及 关闭*/
public class NamesrvController {
/*xxx: 初始化方法*/
public boolean initialize() {
//xxx: 加载配置
loadConfig();
//xxx: 初始化联网组件(remotingServer 与 remotingClient )
initiateNetworkComponents();
//xxx: 初始化线程池
initiateThreadExecutors();
//xxx: 注册请求处理器(处理客户端请求)
registerProcessor();
//xxx: 启动调度服务
startScheduleService();
//xxx: 初始化ssl上下文
initiateSslContext();
//xxx: 初始化rpc钩子
initiateRpcHooks();
return true;
}
}
# 初始化联网组件
class Test{
private void initiateNetworkComponents() {
/*xxx: netty远程服务器 */
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
/*xxx: netty远程客户端*/
this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);
}
}
# 初始化线程池
class Test{
private void initiateThreadExecutors() {
/*xxx: 根据配置设置 客户端请求线程池队列 */
this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());
/*xxx: 根据配置设置 客户端请求线程池 */
this.clientRequestExecutor = new ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")) {
@Override
protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
return new FutureTaskExt<>(runnable, value);
}
};
}
}
# 绑定客户端请求处理器
class Test{
private void registerProcessor() {
/*xxx: 初始化客户端请求处理器,并进行绑定*/
ClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);
/*xxx: 将客户端请求处理器 与 服务器 相绑定 */
this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);
}
}
# 启动调度服务
class Test{
private void startScheduleService() {
//xxx: 每隔5秒钟,进行broker扫码,检查超时的broker, 并关闭信道
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
//xxx: 每隔10分钟,打印配置表状态
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,
1, 10, TimeUnit.MINUTES);
//xxx: 每隔一秒钟打印broker中的队列状态
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
NamesrvController.this.printWaterMark();
} catch (Throwable e) {
LOGGER.error("printWaterMark error.", e);
}
}, 10, 1, TimeUnit.SECONDS);
}
}
# 初始化路由钩子
class Test{
private void initiateRpcHooks() {
//xxx: 注册rpc钩子, 实现基于zone地域路由的功能
this.remotingServer.registerRPCHook(new ZoneRouteRPCHook());
}
}
# namesrv控制器启动过程
public class NamesrvController{
//xxx: 启动namesrv控制器 (启动联网组件: remotingServer, remotingClient, )
public void start() throws Exception {
//xxx: 启动提供远程服务的服务器
this.remotingServer.start();
//xxx: 启动客户端服务
this.remotingClient.start();
if (this.fileWatchService != null) {
/*xxx: 启动文件监听服务*/
this.fileWatchService.start();
}
//xxx: 启动路由维护服务
this.routeInfoManager.start();
}
}
# 服务端启动
- 提供接收外部服务的能力
# 客户端启动
- 提供主动与外部通信的能力
# 文件监听服务启动
- 自动加载最新的配置文件,并重新初始化NameServer中的相关配置,从而避免了手动重启NameServer的麻烦
# 路由维护服务启动
- 用于管理和维护broker和topic之间的路由关系;
# namesrv控制器关闭过程
class Test{
public void shutdown() {
this.remotingClient.shutdown();
this.remotingServer.shutdown();
this.defaultExecutor.shutdown();
this.clientRequestExecutor.shutdown();
this.scheduledExecutorService.shutdown();
this.scanExecutorService.shutdown();
this.routeInfoManager.shutdown();
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}
}
# broker的源码流程
# 启动引导
/*xxx: broker启动类*/
/*xxx: broker是rocketMQ核心 */
public class BrokerStartup {
public static void main(String[] args) {
//xxx: 创建和启动broker控制器组件
start(createBrokerController(args));
}
/*xxx: 创建broker控制器 */
public static BrokerController createBrokerController(String[] args) {
/*xxx: 构建broker控制器 */
BrokerController controller = buildBrokerController(args);
/*xxx: broker控制器初始化 */
boolean initResult = controller.initialize();
if (!initResult) {
/*xxx: 初始化失败,则退出程序*/
controller.shutdown();
System.exit(-3);
}
}
/*xxx: 启动broker控制器 */
public static BrokerController start(BrokerController controller) {
/*xxx: 启动broker控制器 */
controller.start();
}
}
# broker控制器初始化过程
/*xxx: broker控制器组件, 核心生命周期方法包括 初始化,启动,销毁 */
public class BrokerController {
/*xxx: 初始化 */
public boolean initialize() throws CloneNotSupportedException {
/*xxx: 加载主题管理器 */
boolean result = this.topicConfigManager.load();
/*xxx: 加载主题队列映射管理器*/
result = result && this.topicQueueMappingManager.load();
/*xxx: 加载消费者消费位点管理器*/
result = result && this.consumerOffsetManager.load();
/*xxx: 加载订阅组管理器 */
result = result && this.subscriptionGroupManager.load();
/*xxx: 加载 消费者过滤器管理器 */
result = result && this.consumerFilterManager.load();
/*xxx: 加载 消费者序列管理器 */
result = result && this.consumerOrderInfoManager.load();
/*xxx: 实例化消息存储器 */
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
this.messageStore = MessageStoreFactory.build(context, defaultMessageStore);
/*xxx: 消息存储器加载 */
result = result && this.messageStore.load();
/*xxx: broker监控管理器 */
this.brokerMetricsManager = new BrokerMetricsManager(this);
/*xxx: 顺利加载后*/
if (result) {
/*xxx: 初始化远程计算服务器*/
initializeRemotingServer();
/*xxx: 初始化 资源信息*/
initializeResources();
/*xxx: 注册处理器 */
registerProcessor();
/*xxx: 初始化调度任务*/
initializeScheduledTasks();
/*xxx: 初始化事务*/
initialTransaction();
/*xxx: 初始化访问控制 */
initialAcl();
/*xxx: 初始化rpc钩子 */
initialRpcHooks();
}
}
}
# 初始化联网组件
class Test{
protected void initializeRemotingServer() throws CloneNotSupportedException {
/*xxx: 初始化远程监听服务 */
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
/*xxx: 备用远程监听服务 */
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
}
}
# 初始化基础设施资源
/*xxx: broker控制器组件, 核心生命周期方法包括 初始化,启动,销毁 */
public class BrokerController {
protected void initializeResources() {
/*xxx: 调度线程池*/
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryImpl("BrokerControllerScheduledThread", true, getBrokerIdentity()));
/*xxx:发送消息线程池 */
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
/*xxx: 拉取消息线程池*/
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_", getBrokerIdentity()));
/*xxx: 轻量拉取消息线程池*/
this.litePullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getLitePullMessageThreadPoolNums(),
this.brokerConfig.getLitePullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.litePullThreadPoolQueue,
new ThreadFactoryImpl("LitePullMessageThread_", getBrokerIdentity()));
/*xxx: 存放消息线程池*/
this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.putThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
/*xxx: 消息确认线程池*/
this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getAckMessageThreadPoolNums(),
this.brokerConfig.getAckMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.ackThreadPoolQueue,
new ThreadFactoryImpl("AckMessageThread_", getBrokerIdentity()));
/*xxx: 查询消息线程池 */
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_", getBrokerIdentity()));
/*xxx: broker管理线程池 */
this.adminBrokerExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getAdminBrokerThreadPoolNums(),
this.brokerConfig.getAdminBrokerThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.adminBrokerThreadPoolQueue,
new ThreadFactoryImpl("AdminBrokerThread_", getBrokerIdentity()));
/*xxx: 客户端管理线程池 */
this.clientManageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_", getBrokerIdentity()));
/*xxx: 心跳线程池 */
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true, getBrokerIdentity()));
/*xxx: 消费者管理线程池 */
this.consumerManageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getConsumerManageThreadPoolNums(),
this.brokerConfig.getConsumerManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumerManagerThreadPoolQueue,
new ThreadFactoryImpl("ConsumerManageThread_", true, getBrokerIdentity()));
/*xxx: 回复消息线程池 */
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_", getBrokerIdentity()));
/*xxx: 结束事务线程池*/
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_", getBrokerIdentity()));
/*xxx: 负载均衡线程池 */
this.loadBalanceExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.loadBalanceThreadPoolQueue,
new ThreadFactoryImpl("LoadBalanceProcessorThread_", getBrokerIdentity()));
//xxx: broker心跳线程池
this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryImpl("rokerControllerHeartbeatScheduledThread", getBrokerIdentity()));
//xxx: topic队列映射清理线程池
this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this);
}
}
# 注册处理器
class Test {
public void registerProcessor() {
/*xxx: 发送消息钩子*/
sendMessageProcessor.registerSendMessageHook(sendMessageHookList);
//xxx: 消费消息钩子
sendMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
//xxx: 将发送消息处理器 与 服务进行绑定
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
/*xxx: 拉取消息处理器 与 服务绑定 */
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE, this.pullMessageProcessor, this.litePullMessageExecutor);
/*xxx: 消费消息钩子 */
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/*xxx: peek消息处理器*/
this.remotingServer.registerProcessor(RequestCode.PEEK_MESSAGE, this.peekMessageProcessor, this.pullMessageExecutor);
/*xxx: pop消息处理器 */
this.remotingServer.registerProcessor(RequestCode.POP_MESSAGE, this.popMessageProcessor, this.pullMessageExecutor);
/*xxx: 消息确认处理器 */
this.remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
/*xxx: 消息重新消费处理器 */
this.remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
/*xxx: 绑定 通知处理器 */
this.remotingServer.registerProcessor(RequestCode.NOTIFICATION, this.notificationProcessor, this.pullMessageExecutor);
/*xxx: poll方式消费消息处理器 */
this.remotingServer.registerProcessor(RequestCode.POLLING_INFO, this.pollingInfoProcessor, this.pullMessageExecutor);
/*xxx: 绑定 响应消息处理器 */
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
/*xxx: 绑定查询消息处理器 */
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
/*xxx: 绑定客户端管理处理器 */
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
/*xxx: 绑定消费者管理处理器 */
ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
/*xxx: 绑定 负载均衡处理器 */
this.remotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor);
this.remotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor);
/*xxx: 绑定 事务处理器 */
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);
/*xxx: 管理处理器*/
AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}
}
# 初始化调度任务
class Test{
/*xxx: 初始化调度任务*/
protected void initializeScheduledTasks() {
/*xxx: 初始化broker调度任务 */
initializeBrokerScheduledTasks();
}
}
# broker控制器启动过程
class Test{
/*xxx: 启动broker控制器 */
public void start() throws Exception {
/*xxx: 启动基础服务 */
startBasicService();
/*xxx: 将当前broker 注册到 namesrv */
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
//xxx: 同步broker集群信息
BrokerController.this.syncBrokerMemberGroup();
}
}
# 启动基础服务
class Test{
/*xxx: 启动基础服务 */
protected void startBasicService() throws Exception {
/*xxx: 消息存储器启动*/
this.messageStore.start();
/*xxx: 定时消息存储器启动 */
this.timerMessageStore.start();
/*xxx: 副本管理器启动 */
this.replicasManager.start();
/*xxx: 服务锁,等待服务器启动完毕*/
remotingServerStartLatch.await();
/*xxx: 启动远程计算服务器*/
this.remotingServer.start();
/*xxx: 备用服务启动 */
this.fastRemotingServer.start();
/*xxx: 更新消息存储器地址*/
this.storeHost = new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort());
if (this.popMessageProcessor != null) {
/*xxx: pop消息处理器启动相应的服务 */
this.popMessageProcessor.getPopLongPollingService().start();
this.popMessageProcessor.getPopBufferMergeService().start();
this.popMessageProcessor.getQueueLockManager().start();
}
/*xxx: 应答消息处理器,启动相应服务 */
if (this.ackMessageProcessor != null) {
this.ackMessageProcessor.startPopReviveService();
}
if (this.notificationProcessor != null) {
/*xxx: 通知消息处理器,启动相应服务 */
this.notificationProcessor.getPopLongPollingService().start();
}
if (this.topicQueueMappingCleanService != null) {
/*xxx: topic队列映射清理服务启动*/
this.topicQueueMappingCleanService.start();
}
if (this.fileWatchService != null) {
/*xxx: 文件监听服务启动 */
this.fileWatchService.start();
}
if (this.pullRequestHoldService != null) {
/*xxx: pull请求服务启动*/
this.pullRequestHoldService.start();
}
if (this.clientHousekeepingService != null) {
/*xxx: 客户端内务处理服务 启动*/
this.clientHousekeepingService.start();
}
if (this.brokerStatsManager != null) {
/*xxx: broker状态管理器启动*/
this.brokerStatsManager.start();
}
if (this.broadcastOffsetManager != null) {
/*xxx: 广播位移管理器 启动*/
this.broadcastOffsetManager.start();
}
if (this.topicRouteInfoManager != null) {
/*xxx: topic信息管理器启动 */
this.topicRouteInfoManager.start();
}
}
}
# 注册到namesrv
class Test{
/*xxx: 启动broker控制器 */
public void start() throws Exception {
/*xxx: 启动基础服务 */
startBasicService();
/*xxx: 将当前broker 注册到 namesrv */
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
//xxx: 同步broker集群信息
BrokerController.this.syncBrokerMemberGroup();
}
}
# 备注
- RemotingClient相当于是要给主动通信的工具,类似于
ApacheHttpClient
- 不过它是基于netty的,同时对应用层协议封装;
- 相当于定制版的联网客户端;
# broker控制器关闭过程
class Test{
public void shutdown() {
shutdownBasicService();
for (ScheduledFuture<?> scheduledFuture : scheduledFutures) {
scheduledFuture.cancel(true);
}
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.shutdown();
}
}
}