mqtt协议-broker之moqutte源码研究六之集群-创新互联
moquette的集群功能是通过Hazelcast来实现的,对Hazelcast不了解的同学可以自行Google以下。
在讲解moquette的集群功能之前需要讲解一下moquette的拦截器,因为moquette对Hazelcast的集成本身就是通过拦截器来实现的。
一。拦截器
io.moquette.spi.impl.ProtocolProcessor类里面有一个BrokerInterceptor类,这个类就是broker拦截器,这个对象,在processConnect,processPubAck,processPubComp,processDisconnect,processConnectionLost,processUnsubscribe,processSubscribe,processPublish等八个地方都用到了,说明在broker处理各个报文的关键期间都会用到,我们先看一这个类的结构
private static final Logger LOG = LoggerFactory.getLogger(BrokerInterceptor.class);
private final Map, List> handlers;
private final ExecutorService executor;
private BrokerInterceptor(int poolSize, List handlers) {
LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers));
this.handlers = new HashMap<>();
for (Class> messageType : InterceptHandler.ALL_MESSAGE_TYPES) {
this.handlers.put(messageType, new CopyOnWriteArrayList());
}
for (InterceptHandler handler : handlers) {
this.addInterceptHandler(handler);
}
executor = Executors.newFixedThreadPool(poolSize);
}
里面有个map,有一个,看构造方法,发现key是(InterceptConnectMessage.class, InterceptDisconnectMessage.class,
InterceptConnectionLostMessage.class, InterceptPublishMessage.class, InterceptSubscribeMessage.class,
InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class)其中的一个,发现总共有七个消息类,这七个消息类,刚好与上面的八个使用的地方对应上(processPubAck,processPubComp都对应InterceptAcknowledgedMessage)。value是一个list,里面放的是InterceptHandler,即处理器。
从这个map可以看出来,moquette允许你在,各个关键阶段注册一系列的处理器,供它回调。
二。HazelcastInterceptHandler
HazelcastInterceptHandler集成了AbstractInterceptHandler,AbstractInterceptHandler实现了InterceptHandler,这里吐槽一下Java,Java要求实现一个Java的interface就要实现所有的接口,可是很多时候我就只想实现其中一个方法,如果接口里面有10个方法结果
我的实现类里面会出现9个空方法,显得代码非常不简洁,当然,用个抽象类缓冲一下是一个办法,但是我又得加一个类,还好Java8,有个默认方法,能够解决这个问题。
我们跟踪一下,看一下HazelcastInterceptHandler,是什么时候被放进BrokerInterceptor的。
从启动类Server里面的main一路跟踪到下面的代码
public void startServer(IConfig config, List extends InterceptHandler> handlers, ISslContextCreator sslCtxCreator,
IAuthenticator authenticator, IAuthorizator authorizator) throws IOException {
if (handlers == null) {
handlers = Collections.emptyList();
}
LOG.info("Starting Moquette Server. MQTT message interceptors={}", getInterceptorIds(handlers));
scheduler = Executors.newScheduledThreadPool(1);
final String handlerProp = System.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
if (handlerProp != null) {
config.setProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME, handlerProp);
}
configureCluster(config);
final String persistencePath = config.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME);
LOG.info("Configuring Using persistent store file, path={}", persistencePath);
m_processorBootstrapper = new ProtocolProcessorBootstrapper();
final ProtocolProcessor processor = m_processorBootstrapper.init(config, handlers, authenticator, authorizator,
this);
LOG.info("Initialized MQTT protocol processor");
if (sslCtxCreator == null) {
LOG.warn("Using default SSL context creator");
sslCtxCreator = new DefaultMoquetteSslContextCreator(config);
}
LOG.info("Binding server to the configured ports");
m_acceptor = new NettyAcceptor();
m_acceptor.initialize(processor, config, sslCtxCreator);
m_processor = processor;
LOG.info("Moquette server has been initialized successfully");
m_initialized = true;
}
上面有三点需要注意,
1.是从启动参数里面获取INTERCEPT_HANDLER_PROPERTY_NAME = "intercept.handler",放入config,说明注册处理器,是需要通过启动参数指定的,而且目前的版本还只能指定一个。
2.configureCluster(config);配置集群,我们暂且不看怎么配置的。后面再讲
3.通过ProtocolProcessorBootstrapper初始化ProtocolProcessor,这个类有多重要,前面几篇文章已经讲过了。它的init方法,很有意思,里面做了很多事情,基本上就是初始化ProtocolProcessor里面的各种对象,其中与本文要讲的摘抄下来
LOG.info("Configuring message interceptors...");
List observers = new ArrayList<>(embeddedObservers);
String interceptorClassName = props.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
if (interceptorClassName != null && !interceptorClassName.isEmpty()) {
InterceptHandler handler = loadClass(interceptorClassName, InterceptHandler.class, Server.class, server);
if (handler != null) {
observers.add(handler);
}
}
BrokerInterceptor interceptor = new BrokBrokerInterceptor(props, observers);
这里面就是根据classname,实例化处理器,然后创建BrokerInterceptor对象,它的狗爪方法里面调用了这个方法addInterceptHandler
@Override
public void addInterceptHandler(InterceptHandler interceptHandler) {
Class>[] interceptedMessageTypes = getInterceptedMessageTypes(interceptHandler);
LOG.info("Adding MQTT message interceptor. InterceptorId={}, handledMessageTypes={}",
interceptHandler.getID(), interceptedMessageTypes);
for (Class> interceptMessageType : interceptedMessageTypes) {
this.handlers.get(interceptMessageType).add(interceptHandler);
}
}
private static Class>[] getInterceptedMessageTypes(InterceptHandler interceptHandler) {
Class>[] interceptedMessageTypes = interceptHandler.getInterceptedMessageTypes();
if (interceptedMessageTypes == null) {
return InterceptHandler.ALL_MESSAGE_TYPES;
}
return interceptedMessageTypes;
}
调用了interceptHandler.getInterceptedMessageTypes();说明每个InterceptHandler都实现了这个方法,即它自身必须告诉别人,它准备注册到哪几个事件上,即上面的七个类型。
一个处理器,可以注册到上面七个中的一个,或多个,或全部,如果返回的是空,则默认是全部。HazelcastInterceptHandler就注册到了所有的上面
到此我们就明白了HazelcastInterceptHandler注册到拦截器的全过程。
三。集群间通信。
我们回答一下上面的问题,看一下configureCluster的全过程
private void configureCluster(IConfig config) throws FileNotFoundException {
LOG.info("Configuring embedded Hazelcast instance");
String interceptHandlerClassname = config.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
if (interceptHandlerClassname == null || !HZ_INTERCEPT_HANDLER.equals(interceptHandlerClassname)) {
LOG.info("There are no Hazelcast intercept handlers. The server won't start a Hazelcast instance.");
return;
}
String hzConfigPath = config.getProperty(BrokerConstants.HAZELCAST_CONFIGURATION);
if (hzConfigPath != null) {
boolean isHzConfigOnClasspath = this.getClass().getClassLoader().getResource(hzConfigPath) != null;
Config hzconfig = isHzConfigOnClasspath
? new ClasspathXmlConfig(hzConfigPath)
: new FileSystemXmlConfig(hzConfigPath);
LOG.info("Starting Hazelcast instance. ConfigurationFile={}", hzconfig);
hazelcastInstance = Hazelcast.newHazelcastInstance(hzconfig);
} else {
LOG.info("Starting Hazelcast instance with default configuration");
hazelcastInstance = Hazelcast.newHazelcastInstance();
}
listenOnHazelCastMsg();
}
1.判断是否注册了HazelcastInterceptHandler,如果没有直接跳出方法
2.从config里面获取hazelcast.configuration的位置,并且加载配置文件,同时创建HazelcastInstance实例。这个配置文件在moquette-master-improve/distribution/src/main/resources下有一个模版,也可以参考下面的
IP1:5701
5701
IP2:5701
3.监听HazelcastInstance,具体的监听对象是HazelcastListener,这个类刚好和HazelcastInterceptHandler构成一对。刚好用来完成集群间的同步
先看一下HazelcastInterceptHandler,里面只有一个方法,onPublish,即监听事件的发生,发送消息。这里面我最初有一个疑惑,因为我总觉得HazelcastInterceptHandler其实是没必要注册到所有的事件上的,但是它确实是注册到了七个事件。直到看到这个里面只有一个onPublish,我才明白,原来,它确实是注册了七个事件,brokerintercept也确实都回调了它,但是它只实现了这一个方法,其他的方法都是通过抽象方法继承来的空方法,也就是什么都没做。这样当然扩展性稍微好一点,主动权在处理器上面,但其实没有太大的必要。因为每个处理器想监听哪些事件,其实是知道的,监听几个注册几个就完了。
然后看一下HazelcastListener,里面也只有一个方法,onMessage,即当有集群中的其他节点发送消息给当前机器的时候,由Hazelcast,调用HazelcastListener的onMessage方法,看一下这个方法的逻辑
public void onMessage(Message msg) {
try {
if (!msg.getPublishingMember().equals(server.getHazelcastInstance().getCluster().getLocalMember())) {
HazelcastMsg hzMsg = msg.getMessageObject();
LOG.info("{} received from hazelcast for topic {} message: {}", hzMsg.getClientId(), hzMsg.getTopic(),
hzMsg.getPayload());
// TODO pass forward this information in somehow publishMessage.setLocal(false);
MqttQoS qos = MqttQoS.valueOf(hzMsg.getQos());
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(hzMsg.getTopic(), 0);
ByteBuf payload = Unpooled.wrappedBuffer(hzMsg.getPayload());
MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, varHeader, payload);
server.internalPublish(publishMessage, hzMsg.getClientId());
}
} catch (Exception ex) {
LOG.error("error polling hazelcast msg queue", ex);
}
}
逻辑比较简单,构建消息体,调用server.internalPublish(publishMessage, hzMsg.getClientId());原来Server里面的这个方法干这个用的,我之前看到这个方法是一脸懵逼呀。说白了当收到其他的节点发过来的消息的时候,伪造成某个client的publish报文,接着往下看
这个方法内部其实调用的是ProtocolProcessor.internalPublish(msg, clientId),这个方法里面调用的是MessagesPublisher.publish3Subscribers(toStoreMsg, topic),这个方法很眼熟,是因为在这一篇中讲过,https://blog.51cto.com/13579730/2074290
因为Qos0PublishHandler,Qos1PublishHandler,Qos2PublishHandler底层都调的这个方法,至此发现集群间的消息同步,其实就是模仿的client向broker发送publish消息,或者可以理解成:
当某个broker节点收到来自一个连接到它的client发送的publish消息的时候,它不仅会分发给订阅这个消息并且连接到它的其他clients,同时会把消息分发给集群中的其他节点,以供集群中其他节点继续分发下去。
终于把集群讲完了。后面还有一篇,讲topic目录树的,这个系列就算完结了。哈哈
创新互联www.cdcxhl.cn,专业提供香港、美国云服务器,动态BGP最优骨干路由自动选择,持续稳定高效的网络助力业务部署。公司持有工信部办法的idc、isp许可证, 机房独有T级流量清洗系统配攻击溯源,准确进行流量调度,确保服务器高可用性。佳节活动现已开启,新人活动云服务器买多久送多久。
当前名称:mqtt协议-broker之moqutte源码研究六之集群-创新互联
链接地址:http://cdiso.cn/article/dhsded.html