# 概述
- 阿里巴巴开源的一个高性能服务框架,使得应用可通过高性能RPC实现服务的输出和输入功能;
- 高性能、轻量级的开源java RPC框架,提供了三个核心能力: 面向接口的远程方法调用,智能容错和负载均衡,服务自动注册和发现
# 发展史
- 早期是阿里巴巴内部使用的一个分布式服务治理框架
- 2012年开源
- 很多公司基于自身业务特性,进行优化和改进,典型的有:京东JSF,新浪Motan,当当dubbox
- 2014年10月,dubbo停止维护
- 2017年9月,阿里宣布重启dubbo,并做好了长期投入的准备;
- 2018年1月,dubbo创始人之一梁飞透露Dubbo3.0正在动工;
- 2018年2月,dubbo捐献给Apache基金会;
- 2019年5月21日,dubbo正式毕业,成为apache顶级开源项目;
- 2020年,发布3.0往云原生项目发展的战略计划
云原生: 基于分布式部署和统一运维管理的分布式云,以容器、微服务、DevOps等技术未基础建立的一套云技术产品体系。
# 发布史
- 2011.10,发布2.0.7,是开源后的第一个发布版;
- 2012.08, 发布2.5.0;
- 2017.09,发布2.5.4(中间暂停维护了);
- 2019.01,发布2.7.0
- 2021.02,发布2.7.9
- 2021.06,发布3.0
# 工作原理

# 架构设计

# 实践
# 定义服务接口
/**
* @author chenkh
* @time 2021/7/28
*/
public interface HelloService {
void sayHello(String name);
void sayHi(String name);
}
# 定义dubbo工具类
/**
* @author chenkh
* @time 2021/7/26
*/
public class DubboUtils {
public static boolean registerService(Class clazzImpl,Object serviceImpl,String appName,String registryCenter,String namespace){
ServiceConfig serviceConfig = new ServiceConfig();
serviceConfig.setInterface(clazzImpl);
serviceConfig.setInterface(clazzImpl.getName());
serviceConfig.setVersion("1.0");
serviceConfig.setGroup("practice");
serviceConfig.setRef(serviceImpl);
// 普通编码配置方式
ApplicationConfig application = new ApplicationConfig();
// 客户端应用名:可以任意取名
application.setName(appName);
application.setLogger("slf4j");
// 连接注册中心配置
RegistryConfig registry1 = new RegistryConfig();
registry1.setAddress(registryCenter);
registry1.setGroup(namespace);
List<RegistryConfig> registries = new ArrayList<>();
registries.add(registry1);
serviceConfig.setRegistries(registries);
serviceConfig.setApplication(application);
serviceConfig.export();
return true;
};
//创建轻量级消费者引用
private static ReferenceConfig createLightReferenceConfig(String appName,String registryCenter,String namespace) {
// 普通编码配置方式
ApplicationConfig application = new ApplicationConfig();
// 客户端应用名:可以任意取名
application.setName(appName);
application.setLogger("slf4j");
application.setQosPort(22223);
// 连接注册中心配置
RegistryConfig registry1 = new RegistryConfig();
registry1.setAddress(registryCenter);
registry1.setGroup(namespace);
List<RegistryConfig> registries = new ArrayList<>();
registries.add(registry1);
// 引用远程服务
// 当获取到实际的客户端连接后,该实例变得很重,里面封装了所有与注册中心及服务提供方连接,需要缓存
// 此时,尚处于一个轻量级的实例
ReferenceConfig reference = new ReferenceConfig();
reference.setApplication(application);
reference.setRegistries(registries);
reference.setCheck(false);
reference.setGroup("practice");
return reference;
}
private static String getTag(String... tag) {
String tagEnv = System.getProperty("dubbo.provider.tag");
if (tag != null && tag.length > 0 && StringUtils.isNotBlank(tag[0])) {
tagEnv = tag[0];
}
return tagEnv;
}
public static <T> T getService(Class<T> clazz, String version, String... tag) {
if (clazz == null || StringUtils.isBlank(version)) {
throw new IllegalArgumentException("DubboUtils#getService():参数不能为空.");
}
StringBuilder sb = new StringBuilder();
String key = sb.append(clazz.getName()).append(":").append(version).toString();
Object objService = serviceMap.get(key);
if (objService != null && !StringUtils.equals(objService.getClass().getName(), clazz.getName())) {
throw new IllegalArgumentException("DubboUtils#getService():返回值类型不符.期望:" + clazz.getName() +
",实际:" + objService.getClass().getName());
}
if (serviceMap.containsKey(key)) {
return (T) objService;
}
//首次使用时,创建轻量级客户端引用
ReferenceConfig reference = createLightReferenceConfig("dubbo-generic-automan-practice-consumer","zookeeper://localhost:2181","practice");
// 设置tag:用于灰度发布、线上程序调试
String tagEnv = getTag(tag);
if (StringUtils.isNotBlank(tagEnv)) {
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.setTag(tagEnv);
reference.setConsumer(consumerConfig);
}
reference.setVersion(version);
// 弱类型接口名
reference.setInterface(clazz);
// 声明为泛化接口
reference.setGeneric("false");
//获取到客户端引用,此实例变得很重,需要缓存起来
objService = reference.get();
serviceMap.put(key, objService);
return (T) objService;
}
}
# 服务提供者
- 通过指定启动参数:DUBBO_DUBBO_IP_TO_REGISTRY=100.0.0.1可实现服务生产者ip自定义配置;
/**
* @author chenkh
* @time 2021/7/26
*/
public class DubboRunWithoutSpringPracticeProvider {
public static void main(String[] args) {
//注册服务
DubboUtils.registerService(HelloService.class,new HelloServiceImpl(),"dubbo-generic-automan-practice-provider","zookeeper://localhost:2181","practice");
Scanner scanner = new Scanner(System.in);
scanner.nextLine();
}
}
# 服务消费者
package com.automannn.dubbo.withoutSpring;
import com.automannn.dubbo.api.HelloService;
import com.automannn.dubbo.withoutSpring.util.DubboUtils;
/**
* @author chenkh
* @time 2021/7/28
*/
public class DubboRunWithoutSpringPracticeConsumer {
public static void main(String[] args) {
HelloService helloService= DubboUtils.getService(HelloService.class,"1.0","");
helloService.sayHello("automannn");
helloService.sayHi("automannn");
}
}
# 源码
# SPI机制(抽象工厂,获取接口的实现类)
- 扩展点加载器-容器机制
/*xxx: 用于加载 spi的实现类, 线程级别的单例,具有多种加载策略,包括 @SPI策略,@Adaptive策略,Spring的策略 */
/*xxx: 扩展点加载器 */
public class ExtensionLoader<T> {
/*xxx: 扩展点加载器缓存集合,同一类扩展点进行复用 */
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>(64);
/*xxx: 扩展点的实现缓存,复用*/
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>(64);
/*xxx: 扩展点工厂, 每个扩展点加载器实例,都具有一个扩展点工厂,仅用于注入属性*/
private final ExtensionFactory objectFactory;
/*xxx: 扩展点集合缓存,只会加载(扫描)一次 */
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();
/*xxx: 是否具有扩展点注解 */
private static <T> boolean withExtensionAnnotation(Class<T> type) {
return type.isAnnotationPresent(SPI.class);
}
/*xxx: 有条件激活的扩展类 集合*/
private final Map<String, Object> cachedActivates = new ConcurrentHashMap<>();
/*xxx: 扩展点加载器 实例化*/
/*xxx: 抽象工厂模式*/
private ExtensionLoader(Class<?> type) {
this.type = type;
/*xxx: 1.当传入类型,是扩展点顶级工厂接口的时候,当前扩展点加载器实例的 扩展点工厂置空*/
/*xxx: 2.其它类型时,则用 扩展点工厂为空的 扩展点加载器,加载 兼容的扩展点实现(本质上,就是获取工厂的默认实现) */
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
/*xxx: 获取兼容的扩展点 */
public T getAdaptiveExtension() {
/*xxx: 首次获取时,进行创建,之后缓存 */
Object instance = cachedAdaptiveInstance.get();
instance = createAdaptiveExtension();
return (T) instance;
}
/*xxx: 创建兼容的扩展点实例*/
private T createAdaptiveExtension() {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
}
/*xxx: 从工厂中,自动注入参数。 类似spring注入bean */
private T injectExtension(T instance) {
//略...
}
/*xxx: 获取兼容的扩展点类 */
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
/*xxx: 生成扩展点类(就是某个扩展点工厂的默认实现)*/
private Class<?> createAdaptiveExtensionClass() {
/*xxx: 生成动态代理类,该动态代理类 会根据 传入的 默认实现名称,生成源码 */
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
/*xxx: 获取扩展点类集合*/
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
/*xxx: 先检查缓存中,是否有配置信息,没有则会从 META-INF/services/, META-INF/dubbo/,META-INF/dubbo/internal这几个路径下加载*/
/*xxx: 2.7.7版本,将该功能抽成了 策略模式 */
/*xxx: 加载扩展点类 集合*/
classes = loadExtensionClasses();
cachedClasses.set(classes);
return classes;
}
/*xxx: 加载扩展点集合类 */
private Map<String, Class<?>> loadExtensionClasses() {
/*xxx: 检查是否有 @SPI注解,如果有,则获取注解中填写的名称,并缓存为默认实现名,如 @SPI("impl"),则会保存 impl 为默认实现*/
cacheDefaultExtensionName();
Map<String, Class<?>> extensionClasses = new HashMap<>();
/*xxx: 该项目捐献给 apache后,对alibaba的做了兼容 */
/*xxx: 策略提供路径,type提供名称*/
for (LoadingStrategy strategy : strategies) {
loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
}
return extensionClasses;
}
/*xxx: 检查是否配置了 默认扩展点实现*/
private void cacheDefaultExtensionName() {
/*xxx: 检查是否有 @SPI注解,如果有,则获取注解中填写的名称,并缓存为默认实现名,如 @SPI("impl"),则会保存 impl 为默认实现*/
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation == null) {
return;
}
String value = defaultAnnotation.value();
cachedDefaultName =value;
}
/*xxx: 加载类,会自动处理 @Adaptive注解, */
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
boolean overridden) throws NoSuchMethodException {
/*xxx: 如果是自适应类(Adaptive) 则缓存, 缓存的自适应类只能有一个*/
if (clazz.isAnnotationPresent(Adaptive.class)) {
cacheAdaptiveClass(clazz, overridden);
/*xxx: 如果是包装扩展类(Wrapper), 则直接加入包装扩展类的 Set集合*/
} else if (isWrapperClass(clazz)) {
cacheWrapperClass(clazz);
} else {
/*xxx: 自动激活注解 (Activate), 则缓存到自动激活的缓存中*/
clazz.getConstructor();
cacheActivateClass(clazz, names[0]);
}
}
private void cacheName(Class<?> clazz, String name) {
if (!cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, name);
}
}
/*xxx: 缓存自适应类 */
private void cacheAdaptiveClass(Class<?> clazz, boolean overridden) {
/*xxx: 如果发现有多个自适应类,则抛出异常*/
if (cachedAdaptiveClass == null || overridden) {
cachedAdaptiveClass = clazz;
} else if (!cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getName()
+ ", " + clazz.getName());
}
}
/*xxx: 获取自动激活的扩展类*/
public List<T> getActivateExtension(URL url, String key) {
return getActivateExtension(url, key, null);
}
public List<T> getActivateExtension(URL url, String key, String group) {
String value = url.getParameter(key);
return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);
}
public List<T> getActivateExtension(URL url, String[] values, String group) {
/*xxx: 遍历 @Activate注解集合,根据传入的匹配条件,得到所偶符合激活条件的扩展类实现*/
for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Object activate = entry.getValue();
String[] activateGroup, activateValue;
if (activate instanceof Activate) {
activateGroup = ((Activate) activate).group();
activateValue = ((Activate) activate).value();
}
if (isMatchGroup(group, activateGroup)
&& !names.contains(name)
&& !names.contains(REMOVE_VALUE_PREFIX + name)
&& isActive(activateValue, url)) {
activateExtensions.add(getExtension(name));
}
}
/*xxx: 返回所有自动激活类集合*/
return activateExtensions;
}
}
- 扩展点加载器-获取实现类的三种方式
//todo
- 扩展点注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
/*xxx: 声明 dubbo扩展点*/
/*xxx: Service Provider Interface, 是 JAVA提供的一套 用来被第三方实现 或者 扩展的 接口。
它可以用来启用框架扩展和替换组件。SPI的作用, 就是为这些被扩展的API寻找服务实现 */
/*xxx: SPI 是调用方 指定的 接口规范, 提供给外部用来实现,主要是框架扩展人员使用。*/
public @interface SPI {
String value() default "";
}
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
/*xxx: 作为 spi 的一个附加实现,通过此注解,可以实现 接口实现的优先级关系 */
/*xxx: 加在类上,则某个类为 spi的默认实现, 加在方法上,则方法支持根据参数选择实现类*/
public @interface Adaptive {
String[] value() default {};
}
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
/*xxx: 该注解 用于 自动激活某些与给定规则匹配的 扩展接口实现*/
public @interface Activate {
/*xxx: 激活与当前分组相匹配的扩展接口实现 */
String[] group() default {};
/*xxx: 激活 与给定的 配置相匹配的 扩展*/
String[] value() default {};
}
# 工厂机制(容器,实现自动注入)
@SPI
/*xxx: 扩展点工厂*/
public interface ExtensionFactory {
/*xxx: 获取扩展点*/
<T> T getExtension(Class<T> type, String name);
}
@Adaptive
/*xxx: 作为一开始的默认工厂进行实现,默认是以 dubbo spi管理的容器*/
/*xxx: 工厂代理*/
public class AdaptiveExtensionFactory implements ExtensionFactory {
/*xxx: 用来缓存所有工厂实现,包括 SpiExtensionFactory, SpringExtensionFactory*/
private final List<ExtensionFactory> factories;
public AdaptiveExtensionFactory() {
/*xxx: 工厂列表也是通过 SPI 实现的,因此可以在这里获取所有工厂的扩展点加载器*/
ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
/*xxx: 遍历所有的工厂名称,获取对应的工厂,并保存到 factories 列表中*/
for (String name : loader.getSupportedExtensions()) {
list.add(loader.getExtension(name));
}
/*xxx: 会通过TreeSet进行排序,SPI排在前面,Spring排在后面*/
/*xxx: 当调用 getExtension方法时,会遍历工厂,先从 SPI容器中获取扩展类,如果没找到,再从Spring容器中查找*/
factories = Collections.unmodifiableList(list);
}
@Override
public <T> T getExtension(Class<T> type, String name) {
/*xxx: 遍历所有的工厂进行查找,顺序是 SPI -> Spring*/
for (ExtensionFactory factory : factories) {
T extension = factory.getExtension(type, name);
if (extension != null) {
return extension;
}
}
return null;
}
}
- Spring容器
从spring上下文中获取依赖;
- SPI容器
/*xxx: 通过工厂类,加载 注解了 @SPI的实现类 */
public class SpiExtensionFactory implements ExtensionFactory {
@Override
public <T> T getExtension(Class<T> type, String name) {
/*xxx: 获取扩展点接口对应的Adaptive实现类*/
if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
/*xxx: 根据类型,获取所有的扩展点加载器*/
ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
/*xxx: 如果缓存的扩展点类不为空,则直接返回 Adaptive实例*/
/*xxx: 实际返回的是某一个接口的注解了 @Adaptive的默认实现类,或者注解了 @Adaptive方法的动态代理类,可实现根据参数,决定适配类*/
if (!loader.getSupportedExtensions().isEmpty()) {
return loader.getAdaptiveExtension();
}
}
return null;
}
}
# URL实例抽象
/*xxx: 统一资源定位符,一般情况下,由多个配置组成,包括: 协议类型,主机,端口,路径等*/
public
class URL implements Serializable {
/*xxx: 协议类型*/
private final String protocol;
/*xxx:账户名*/
private final String username;
/*xxx:密码*/
private final String password;
// by default, host to registry
/*xxx:默认主机*/
private final String host;
// by default, port to registry
/*xxx:默认端口*/
private final int port;
/*xxx:路径*/
private final String path;
/*xxx:参数*/
private final Map<String, String> parameters;
/*xxx:方法参数*/
private final Map<String, Map<String, String>> methodParameters;
}
# RPC机制的实现-dubbo协议
- 协议层
@SPI("dubbo")
/*xxx: 协议抽象*/
public interface Protocol {
/*xxx: 默认协议端口 */
int getDefaultPort();
@Adaptive
/*xxx: 暴露服务给远程调用 */
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
/*xxx: 获取远程服务代理*/
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/*xxx: 获取所有提供该协议的服务器: 由地址字符串,以及服务描述URL组成 */
default List<ProtocolServer> getServers() {
return Collections.emptyList();
}
}
/*xxx: 抽象协议*/
public abstract class AbstractProtocol implements Protocol {
/*xxx: 协议暴露服务的缓存 */
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
/*xxx: 协议服务器缓存 */
protected final Map<String, ProtocolServer> serverMap = new ConcurrentHashMap<>();
/*xxx: 构造服务的key */
protected static String serviceKey(URL url) {
/*xxx: 获取绑定的端口*/
int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
return serviceKey(port, url.getPath(), url.getParameter(VERSION_KEY), url.getParameter(GROUP_KEY));
}
/*xxx: 根据 服务的端口,服务名称,服务版本,服务分组构建 服务键名 */
protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}
@Override
/*xxx: 引用远程服务*/
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
/*xxx: 使用异步转同步的 装饰执行器 */
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
/*xxx: 通过协议引用远程服务*/
protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
}
/*xxx: dubbo协议,实现远程服务调用 */
public class DubboProtocol extends AbstractProtocol {
/*xxx: 默认的dubbo远程服务暴露端口 */
public static final int DEFAULT_PORT = 20880;
/*xxx: dubbo协议实体,单例模式实现 */
private static DubboProtocol INSTANCE;
@Override
/*xxx: 暴露服务*/
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
/*xxx: 首先从执行器中,获取 URL信息 */
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
/*xxx: dubbo协议 的exporter也会在本地维护一份缓存 */
exporterMap.put(key, exporter);
/*xxx: 省略部分抽象...*/
/*xxx:通过信息交换,以及网络传输层,将服务暴露 */
openServer(url);
/*xxx: 省略部分抽象...*/
return exporter;
}
/*xxx: 创建协议服务器 */
private void openServer(URL url) {
/*xxx: 获取服务器地址 */
/*xxx: 某个机器已经启动了netty服务,则不会再启动*/
String key = url.getAddress();
/*xxx: 调用此代码,默认作为服务端提供外部服务,也可以显式设置值,禁止此特性 */
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
/*xxx: 以当前的key,获取服务端实例 */
ProtocolServer server = serverMap.get(key);
if (server == null) {
/*xxx: 如果没有获取到,则创建*/
/*xxx: 根据url信息,创建服务 */
serverMap.put(key, createServer(url));
}else{
//xxx: 如当前地址,已开启服务,则调用reset方法
//该方法主要作用应该是记录,往注册中心记录,暂未跟代码了
server.reset(url);
}
}
}
/*xxx: 创建url服务*/
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
/*xxx: 默认情况下允许心跳机制,1分钟 */
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
/*xxx: 信息交换服务器,绑定url */
ExchangeServer server = Exchangers.bind(url, requestHandler);
return new DubboProtocolServer(server);
}
/*xxx: 请求处理适配器*/
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
/*xxx: 答复消息*/
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
/*xxx: 消息类型不是 Invocation的话,则 抛错*/
if (!(message instanceof Invocation)) {
//抛错...
}
Invocation inv = (Invocation) message;
/*xxx: 从 Invocation中,获取invoker */
Invoker<?> invoker = getInvoker(channel, inv);
/*xxx: 检查调用合法性(方法是否存在),略*/
/*xxx: 通过invoker 执行,并获取结果 */
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
@Override
/*xxx: 接收到消息*/
public void received(Channel channel, Object message) throws RemotingException {
/*xxx: 接收到信息时,进行分派*/
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
/*xxx: 根据url,创建本地Invocation */
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
/*xxx: 根据方法的键,获取对应的方法*/
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
/*xxx: 设置远程调用的相关参数 */
RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), new Class<?>[0], new Object[0]);
invocation.setAttachment(PATH_KEY, url.getPath());
invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY));
if (url.getParameter(STUB_EVENT_KEY, false)) {
invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
}
- 交换层
/*xxx: 服务交换器外观模式 */
public class Exchangers {
/*xxx: 服务交换层绑定地址,返回服务交换实例 */
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
/*xxx: 获取交换服务实例*/
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
}
@SPI(HeaderExchanger.NAME)
/*xxx: 信息交换器的抽象工厂*/
public interface Exchanger {
@Adaptive({Constants.EXCHANGER_KEY})
/*xxx: 将url与交换处理器绑定 */
/*xxx: 服务端使用 */
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
@Adaptive({Constants.EXCHANGER_KEY})
/*xxx: 连接url,并提供 交换处理器 */
/*xxx: 客户端使用*/
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}
/*xxx: 默认的消息发送器: header*/
/*xxx: dubbo的客户端与服务端实现,都基于此*/
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
/*xxx: 重型实例*/
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
/*xxx: 默认的连接实现,调用 网络传输层进行处理 */
/*xxx: 信息交换处理器,由 解码处理器装饰实现*/
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
/*xxx: 重型实例*/
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
/*xxx: 默认的绑定实现, 调用 网络传输层进行实现 */
/*xxx: 信息交换处理器,由 解码处理器装饰实现*/
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
- 传输层
/*xxx: 网络传输层工厂*/
public class Transporters {
/*xxx: 将 url 与 信道处理器进行绑定 */
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
/*xxx: 如果有多个信道处理器,则需要处理器进行分发 */
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
}
@SPI("netty")
/*xxx: 网络传输层 */
public interface Transporter {
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
/*xxx: 服务端调用*/
RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
/*xxx: 客户端调用 */
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
/*xxx: 默认的 网络传输层实现,netty4 */
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
/*xxx: 服务端 */
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}
@Override
/*xxx: 客户端*/
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new NettyClient(url, handler);
}
}
- 网络实现
/*xxx: netty服务端*/
public class NettyServer extends AbstractServer implements RemotingServer {
/*xxx: 每次新增暴露服务的时候,都会新生成该实例*/
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}
@Override
protected void doOpen() throws Throwable {
/*xxx: 省略netty的使用方式...*/
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
}
}
/*xxx: 抽象网络服务端 */
public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer {
//本地端口
private InetSocketAddress localAddress;
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
/*xxx: 获取绑定的ip*/
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
/*xxx: 获取绑定的端口 */
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
bindAddress = new InetSocketAddress(bindIp, bindPort);
/*xxx: 打开服务*/
doOpen();
}
/*xxx: 开启服务端服务*/
protected abstract void doOpen() throws Throwable;
/*xxx: 获取待绑定接口*/
public InetSocketAddress getBindAddress() {
return bindAddress;
}
}
一句话总结:暴露网络端口,客户端传来path,服务端通过path在本地缓存找exporter,缓存位于exporterMap中,且必须有。
# 泛化调用
- 案例
package com.automannn.dubbo.bean;
import java.io.Serializable;
/**
* @author automannn
* @Date 2022/4/20
*/
public class MyDTO implements Serializable {
private String id;
private String name;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "MyDTO{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
'}';
}
}
package com.automannn.dubbo.api;
import com.automannn.dubbo.bean.MyDTO;
/**
* @author chenkh
* @time 2021/7/28
*/
public interface HelloService {
void sayHello(String name);
void sayHi(String name);
String invokeBean(MyDTO myDTO);
}
package com.automannn.dubbo.genericRpc;
import com.automannn.dubbo.api.HelloService;
import com.automannn.dubbo.provider.HelloServiceImpl;
import com.automannn.dubbo.withoutSpring.util.DubboUtils;
import java.util.Scanner;
/**
* @author automannn
* @Date 2022/4/20
*/
public class ApiServiceProvider {
public static void main(String[] args) {
DubboUtils.registerService(HelloService.class,new HelloServiceImpl(),"dubbo-generic-automan-practice-provider","zookeeper://localhost:2181","practice");
Scanner scanner = new Scanner(System.in);
scanner.nextLine();
}
}
package com.automannn.dubbo.genericRpc;
import com.automannn.dubbo.api.HelloService;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.rpc.service.GenericService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* @author automannn
* @Date 2022/4/20
* @Description 泛化调用
*/
public class ApiGenericConsumer {
public static void main(String[] args) {
ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(new ApplicationConfig("my-generic-consumer"));
RegistryConfig registry1 = new RegistryConfig();
registry1.setAddress("zookeeper://localhost:2181");
registry1.setGroup("practice");
List<RegistryConfig> registries = new ArrayList<>();
registries.add(registry1);
referenceConfig.setRegistries(registries);
//xxx: 泛化调用,消费端可以不依赖api接口
referenceConfig.setInterface("com.automannn.dubbo.api.HelloService");
referenceConfig.setVersion("1.0");
referenceConfig.setGroup("practice");
//xxx: 设置为泛化调用
referenceConfig.setGeneric("true");
GenericService genericService = referenceConfig.get();
genericService.$invoke("sayHello",new String[]{"java.lang.String"},new Object[]{"generic Call"});
HashMap<String,Object> param = new HashMap<>();
param.put("id","1");
param.put("name","automannn");
String result = (String) genericService.$invoke("invokeBean",new String[]{"com.automannn.dubbo.bean.MyDTO"},new Object[]{param});
System.out.println(result);
}
}
- 有无API接口依赖
有不有接口依赖,比较大的区别是,可以通过动态代理,将底层的转换屏蔽掉,给用户原生的使用体验,但其底层的转换,依然是逃不掉的。从这个意义上讲,有没有接口依赖,并无大的大别
- 设置泛化调用参数的影响
设置泛化调用后,生成的动态代理,既实现了原本的接口,也实现了泛化接口GenericService,形成的多态调用。换言之,泛化调用是兼容不泛化时的情况的
# 注册中心机制
- 顶层设计
/*xxx: 注册中心服务 */
public interface RegistryService {
/*xxx: 注册服务*/
void register(URL url);
/*xxx: 注销服务*/
void unregister(URL url);
/*xxx: 订阅服务*/
void subscribe(URL url, NotifyListener listener);
/*xxx: 取消订阅*/
void unsubscribe(URL url, NotifyListener listener);
/*xxx: 查找服务*/
List<URL> lookup(URL url);
}
/*xxx: API 或者 SPI 节点特性*/
public interface Node {
/*xxx: 是否可达 */
boolean isAvailable();
}
/*xxx: 注册中心接口 */
public interface Registry extends Node, RegistryService {
/*xxx: 注册服务*/
default void reExportRegister(URL url) {
register(url);
}
/*xxx: 注销服务*/
default void reExportUnregister(URL url) {
unregister(url);
}
}
- 实现设计(zk)
/*xxx: zk实现的注册中心 */
public class ZookeeperRegistry extends FailbackRegistry {
/*xxx: zk客户端,是zk注册中心的关键*/
private final ZookeeperClient zkClient;
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
/*xxx: 默认的zk命名空间:dubbo */
this.root = group;
/*xxx: zk客户端实例 */
zkClient = zookeeperTransporter.connect(url);
/*xxx: zk状态监测回调*/
zkClient.addStateListener((state) -> {
//xxx: 略
});
}
@Override
/*xxx: 节点是否可达*/
public boolean isAvailable() {
return zkClient.isConnected();
}
@Override
/*xxx: 服务注册*/
public void doRegister(URL url) {
/*xxx: zk注册中心,就是创建一个临时节点*/
/*xxx: 注意,这个临时节点的根节点,又称为命名空间,不指定的情况下,为dubbo,这特别重要*/
/*xxx: 尤其是当 当前为消费者时,由于消费者可能开启了check,导致问题不好排查*/
/*xxx: 另外,命名空间与 服务分组,在dubbo的api上面表现出来很相似,但它们是完全不同的两个概念,*/
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
}
@Override
/*xxx: 服务注销,就是删除临时节点*/
public void doUnregister(URL url) {
zkClient.delete(toUrlPath(url));
}
@Override
/*xxx: 节点订阅,节点变化时,收到通知,并做出相应行为*/
public void doSubscribe(final URL url, final NotifyListener listener) {
//略
}
@Override
/*xxx: 取消节点订阅*/
public void doUnsubscribe(URL url, NotifyListener listener) {
//xxx: 略
}
@Override
/*xxx: 查找节点*/
public List<URL> lookup(URL url) {
/*xxx: 获取提供者列表*/
List<String> providers = new ArrayList<>();
/*xxx: 获取特定路径下的,所有子节点信息*/
for (String path : toCategoriesPath(url)) {
List<String> children = zkClient.getChildren(path);
if (children != null) {
providers.addAll(children);
}
}
return toUrlsWithoutEmpty(url, providers);
}
/*xxx: 获取路径参数,如果指定了通配为 *,则会把 提供者,消费者,路由,配置等路径全部包含*/
/*xxx: 否则只包含 提供者的路径*/
private String[] toCategoriesPath(URL url) {
//xxx: 略...
}
/*xxx: 从所有的提供者里面,找到与消费者相匹配的提供者,并返回*/
private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
/*xxx: 先判断协议是否相匹配*/
if (provider.contains(PROTOCOL_SEPARATOR_ENCODED)) {
/*xxx: 将提供者反序列化为url*/
URL url = URLStrParser.parseEncodedStr(provider);
/*xxx: 判断消费者的url与提供者url是否匹配*/
if (UrlUtils.isMatch(consumer, url)) {
urls.add(url);
}
}
return urls;
}
}
- 实现设计(nacos)
/*xxx: nacos注册中心 */
public class NacosRegistry extends FailbackRegistry {
/*xxx: nacos客户端 */
private final NamingService namingService;
public NacosRegistry(URL url, NamingService namingService) {
super(url);
this.namingService = namingService;
}
@Override
/*xxx: 节点可用性检查*/
public boolean isAvailable() {
return "UP".equals(namingService.getServerStatus());
}
@Override
/*xxx: 查找服务*/
public List<URL> lookup(final URL url) {
final List<URL> urls = new LinkedList<>();
execute(namingService -> {
Set<String> serviceNames = getServiceNames(url, null);
for (String serviceName : serviceNames) {
/*xxx: 获取所有的服务实例*/
List<Instance> instances = namingService.getAllInstances(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP));
urls.addAll(buildURLs(url, instances));
}
});
return urls;
}
@Override
/*xxx: nacos注册服务*/
public void doRegister(URL url) {
/*xxx: 首先获取url的服务名称 */
final String serviceName = getServiceName(url);
/*xxx: 根据 url创建客户端实例信息 */
final Instance instance = createInstance(url);
/*xxx: 通过nacos的客户端api规范,注册服务,同样又命名空间的概念 */
execute(namingService -> namingService.registerInstance(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP), instance));
}
@Override
/*xxx: 注销服务*/
public void doUnregister(final URL url) {
execute(namingService -> {
String serviceName = getServiceName(url);
Instance instance = createInstance(url);
namingService.deregisterInstance(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
instance.getIp()
, instance.getPort());
});
}
}
- 应用设计-提供端
/*xxx: 注册中心协议 */
public class RegistryProtocol implements Protocol {
@Override
/*xxx: 注册到认证中心 */
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
/*xxx: 本地服务暴露,该方法执行后,实际上已经可以通过url的方式访问服务了 */
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
/*xxx: 注册到注册中心*/
register(registryUrl, registeredProviderUrl);
}
/*xxx: 注册实现,见上方顶层设计*/
private void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
protocol.export(invokerDelegate);
//xxx: 省略其他抽象...
}
}
- 应用设计-消费端
/*xxx:注册中心路径抽象 */
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
/*xxx: 消费端通过订阅提供端节点信息,动态更新 invokers */
private void refreshInvoker(List<URL> invokerUrls) {
/*xxx: 直接根据url 转换为 Invoker*/
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
/*xxx: routerChain中的invoker设置好后,后续便可以直接根据它调用*/
routerChain.setInvokers(newInvokers);
}
/*xxx: 根据url构造invoker */
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
/*xxx: 遍历提供者地址*/
for (URL providerUrl : urls) {
/*xxx: 根据url,构建invoker*/
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
}
}
# 完整调用机制
# 服务注册流程
/*xxx: 服务提供者配置,单独服务暴露时,可以以此为起点进行查看 */
/*xxx: 它 具有基类配置的特性 */
public class ServiceConfig<T> extends ServiceConfigBase<T> {
public synchronized void export() {
/*xxx: service自带初始上下文特性 */
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.initialize();
}
/*xxx: 实际的服务暴露方法*/
doExport();
}
protected synchronized void doExport() {
doExportUrls();
}
/*xxx: 本地服务托管,以及特定的服务协议路径映射 */
private void doExportUrls() {
/*xxx; 获取 服务容器*/
/*xxx: 远程调用的本质: 路径->接口->实现类*/
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
/*xxx: 绑定接口跟实现关系 */
repository.registerProvider(
getUniqueServiceName(),
ref,
//xxx: serviceProvider中,具有接口信息
serviceDescriptor,
this,
serviceMetadata
);
/*xxx: 从当前的服务配置中,获取 注册中心地址信息*/
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
/*xxx: 遍历服务暴露协议,并向指定注册中心注册,同时,将暴露的服务路径,与接口映射起来*/
for (ProtocolConfig protocolConfig : protocols) {
/*xxx: 构造 键名 */
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
/*xxx: 将协议路径,与接口映射起来*/
repository.registerService(pathKey, interfaceClass);
//xxx: 注册中心注册节点,本质上就是注册一串文本信息,从这个意义上讲,注册中心不那么玄妙
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
/*xxx: 为某种协议,注册节点 */
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
/*xxx: 获取协议名称,没有指定的情况下,使用dubbo */
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
Map<String, String> map = new HashMap<String, String>();
/*xxx: 设置应用的类型,为提供方还是消费方 */
map.put(SIDE_KEY, PROVIDER_SIDE);
/*xxx: 处理方法集合,将所有的方法以及参数类型,序列化为文本,并作为 属性,放入map中*/
if (CollectionUtils.isNotEmpty(getMethods())) {
//xxx: 略...
}
/*xxx: 是否是泛化服务,如果是泛化服务,则methods的值为*,否则为所有的方法名字符串 */
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
/*xxx: 获取本机暴露服务的ip 及 端口 信息*/
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
/*xxx: 根据本机信息,构建 url*/
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
/*xxx: 生成Invoker,该 invoker 是一个实现了接口的动态代理类 */
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
/*xxx: 生成exporter,在此时,服务已经对外进行了暴露*/
/*xxx: 此时的URL为: registry://localhost:2181,换言之,协议为registry.如果此时没有注册中心,则时dubbo协议*/
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
/*xxx: 并将暴露的服务信息缓存起来 */
exporters.add(exporter);
}
}
服务暴露,最核心的方法就在于Exporter<?> exporter = PROTOCOL.export(wrapperInvoker)
# 默认协议的实现
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
http=org.apache.dubbo.rpc.protocol.http.HttpProtocol
registry=org.apache.dubbo.registry.integration.RegistryProtocol
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper
实际的暴露链为: qos->filter->listener->registry(注册中心协议内部调用dubbo) ---> qos->filter->listener->dubbo 具体协议的包装过程,由ExtentionLoader实现,略
# 服务调用流程
/*xxx: 消费者接口层配置*/
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
/*xxx: 获取消费者代理*/
public synchronized T get() {
init();
return ref;
}
public synchronized void init() {
/*xxx: 初始化dubboBootstrap */
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
/*xxx: 设置服务的类型,为消费者端*/
map.put(SIDE_KEY, CONSUMER_SIDE);
/*xxx: 获取dubbo服务的ip地址*/
String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
/*xxx: 如果没有在系统层面进行配置,则获取本机的地址*/
if (StringUtils.isEmpty(hostToRegistry)) {
hostToRegistry = NetUtils.getLocalHost();
}
/*xxx: 根据接口信息,创建服务代理 */
ref = createProxy(map);
}
/*xxx: 根据服务元数据,创建代理 */
private T createProxy(Map<String, String> map) {
/*xxx: 根据实际的协议进行引用 */
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
}
}
本质上,通过注册中心协议实现负载均衡,通过其他应用协议如dubbo,grpc实现远程调用
/*xxx: 注册中心协议 */
public class RegistryProtocol implements Protocol {
@Override
/*xxx: 注册中心引用 */
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
Registry registry = registryFactory.getRegistry(url);
/*xxx: 获取提供者集群,没指定的情况下,使用 MockCluster*/
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
/*xxx: 消费者是否需要注册 */
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
/*xxx: 将消费者进行注册*/
registry.register(directory.getRegisteredConsumerUrl());
}
/*xxx: 订阅生产者节点 */
directory.subscribe(toSubscribeUrl(subscribeUrl));
/*xxx: 获取代理 */
Invoker<T> invoker = cluster.join(directory);
}
}
/*xxx: 模拟集群实现*/
public class MockClusterWrapper implements Cluster {
private Cluster cluster;
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
}
public class FailoverCluster extends AbstractCluster {
public final static String NAME = "failover";
@Override
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<>(directory);
}
}
public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {
@Override
public Result invoke(final Invocation invocation) throws RpcException {
/*xxx: 从注册中心,获取提供者列表*/
List<Invoker<T>> invokers = list(invocation);
/*xxx: 构造负载均衡器*/
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
return doInvoke(invocation, invokers, loadbalance);
}
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
LoadBalance loadbalance) throws RpcException;
/*xxx: 列出提供者列表*/
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
return directory.list(invocation);
}
}
/*xxx: 默认负载均衡策略: 错误重试*/
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
@Override
/*xxx: 根据负载均衡策略,以及重试次数进行调用 */
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
//xxx: 略...
}
}
/*xxx: 目录接口 */
public interface Directory<T> extends Node {
/*xxx: 列出 invocation的invokers列表 */
List<Invoker<T>> list(Invocation invocation) throws RpcException;
}
/*xxx: 抽象路径*/
public abstract class AbstractDirectory<T> implements Directory<T> {
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
return doList(invocation);
}
protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
}
/*xxx:注册中心路径抽象 */
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
@Override
public List<Invoker<T>> doList(Invocation invocation) {
List<Invoker<T>> invokers = routerChain.route(getConsumerUrl(), invocation);
return invokers;
}
/*xxx: 消费端通过订阅提供端节点信息,动态更新 invokers */
private void refreshInvoker(List<URL> invokerUrls) {
/*xxx: 直接根据url 转换为 Invoker*/
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
/*xxx: routerChain中的invoker设置好后,后续便可以直接根据它调用*/
routerChain.setInvokers(newInvokers);
}
/*xxx: 根据url构造invoker */
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
/*xxx: 遍历提供者地址*/
for (URL providerUrl : urls) {
/*xxx: 根据url,构建invoker*/
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
}
}
/*xxx: 抽象协议*/
public abstract class AbstractProtocol implements Protocol {
@Override
/*xxx: 引用远程服务*/
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
/*xxx: 使用异步转同步的 装饰执行器 */
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
/*xxx: 通过协议引用远程服务*/
protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
}
/*xxx: dubbo协议,实现远程服务调用 */
public class DubboProtocol extends AbstractProtocol {
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
/*xxx: 创建一个 dubboInvoker */
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
}
}
/*xxx: dubbo执行器 */
public class DubboInvoker<T> extends AbstractInvoker<T> {
@Override
/*xxx: dubbo执行器,实际执行的方法,默认情况下,是一个异步的请求,通过future完成*/
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
ExchangeClient currentClient;
//xxx: 发送远程请求
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
//xxx: 省略其他抽象...
}
}
# 服务容器
/*xxx: 服务容器,仅用于本地便捷获取使用,远程调用时不涉及 */
public class ServiceRepository extends LifecycleAdapter implements FrameworkExt {
/*xxx: 存储服务信息*/
private ConcurrentMap<String, ServiceDescriptor> services = new ConcurrentHashMap<>();
/*xxx: 存储消费者信息 */
private ConcurrentMap<String, ConsumerModel> consumers = new ConcurrentHashMap<>();
/*xxx; 存储提供者信息*/
private ConcurrentMap<String, ProviderModel> providers = new ConcurrentHashMap<>();
//xxx: 略...
}
# spring集成原理
/*xxx: dubbo与spring提供了原生集成*/
public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {
@Override
public void init() {
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
}
//xxx: 省略其他抽象...
}
/*xxx: service工厂*/
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, BeanNameAware, ApplicationEventPublisherAware {
//xxx:省略抽象...
}
/*xxx: 抽象注解bean后置处理器: ServiceBean调用export,ReferenceBean调用refer */
public abstract class AbstractAnnotationBeanPostProcessor extends
InstantiationAwareBeanPostProcessorAdapter implements MergedBeanDefinitionPostProcessor, PriorityOrdered,
BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, DisposableBean {
//xxx: 略...
}
# dubbo扩展点专题
ExtensionLoader
,ExtensionFactory
,@SPI
,@Adaptive
,@Activate
都是dubbo框架的扩展点机制,用于实现dubbo的可扩展性
# ExtensionLoader
- 扩展点加载器,负责加载Dubbo的扩展点接口实现;
- 动态代理的过程是由其实现的。
# 功能
- 加载扩展点接口的所有实现
- 缓存扩展点接口的所有实现
- 获取指定名称的扩展点实现
- 获取自适应扩展点实现
# 实例
- 配置文件
META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
http=org.apache.dubbo.rpc.protocol.http.HttpProtocol
rmi=org.apache.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=org.apache.dubbo.rpc.protocol.hessian.HessianProtocol
org.apache.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=org.apache.dubbo.rpc.protocol.thrift.ThriftProtocol
native-thrift=org.apache.dubbo.rpc.protocol.nativethrift.ThriftProtocol
memcached=org.apache.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=org.apache.dubbo.rpc.protocol.redis.RedisProtocol
rest=org.apache.dubbo.rpc.protocol.rest.RestProtocol
xmlrpc=org.apache.dubbo.xml.rpc.protocol.xmlrpc.XmlRpcProtocol
grpc=org.apache.dubbo.rpc.protocol.grpc.GrpcProtocol
registry=org.apache.dubbo.registry.integration.RegistryProtocol
service-discovery-registry=org.apache.dubbo.registry.client.ServiceDiscoveryRegistryProtocol
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper
package com.automannn.dubbo.extensionLoader;
import org.apache.dubbo.cache.CacheFactory;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.rpc.Protocol;
import java.util.ArrayList;
import java.util.Set;
/**
* @author automannn
* @Date 2022/4/18
*/
public class ExtenSionLoaderTest {
public static void main(String[] args) {
ExtensionLoader loader = ExtensionLoader.getExtensionLoader(Protocol.class);
Set<String> supportedExtensions = loader.getSupportedExtensions();
ArrayList<String> extensionList = new ArrayList<>();
//获取所有的扩展点实现
for (String supportedExtension : supportedExtensions) {
extensionList.add(supportedExtension);
}
System.out.println(extensionList.size());
System.out.println(extensionList);
//获取指定名称的实现类
System.out.println(loader.getExtension("http"));
//获取自适应实现类
System.out.println(loader.getAdaptiveExtension());
}
}
16
[dubbo, grpc, hessian, http, injvm, memcached, mock, native-thrift, redis, registry, rest, rmi, service-discovery-registry, thrift, webservice, xmlrpc]
org.apache.dubbo.qos.protocol.QosProtocolWrapper@1efed156
org.apache.dubbo.rpc.Protocol$Adaptive@9660f4e
# ExtensionFactory
- dubbo扩展点的自适应工厂
- 用于根据指定的名称和类型,获取对应的扩展点实例
- 它是通过
AdaptiveExtensionFactory
实现的
# 功能
- 根据类型,获取对应的扩展点实例;
- 根据名称和类型,获取对应的扩展点实例;
# 实例
package com.automannn.dubbo.extensionFactoryTest;
import org.apache.dubbo.common.extension.ExtensionFactory;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.Protocol;
/**
* @author automannn
* @Date 2023/5/23
*/
public class ExtensionFactoryTest {
public static void main(String[] args) {
//对于扩展点工厂, 扩展点加载器会特殊处理
ExtensionFactory extensionFactory = ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension();
//对于扩展点工厂, 扩展点加载器会特殊处理
System.out.println(extensionFactory);
//获取对象的时候,分别从 spi工厂,spring工厂获取,
// 其中,spi工厂不论传入生命名称,都是获取的 接口的兼容扩展点实现
System.out.println(extensionFactory.getExtension(Protocol.class,"dubbo"));
}
}
org.apache.dubbo.common.extension.factory.AdaptiveExtensionFactory@35851384
org.apache.dubbo.rpc.Protocol$Adaptive@35d176f7
# @SPI
- 扩展点机制的核心注解,用于标记Dubbo扩展点接口;
- 用于指定Dubbo扩展点的默认实现类名称,如果用户没有指定其它实现,则将使用默认实现;
# 实例
- 配置文件
META-INF/services/com.automannn.dubbo.handleExtention.HandlerTest
my=com.automannn.dubbo.handleExtention.MyHandlerTest
other=com.automannn.dubbo.handleExtention.OtherHandler
filter=com.automannn.dubbo.handleExtention.FilterHandler
@SPI("my")
public interface HandlerTest {
void handle(String msg);
}
public class MyHandlerTest implements HandlerTest{
@Override
public void handle(String msg) {
System.out.println("myHandler");
}
}
public class OtherHandler implements HandlerTest{
@Override
public void handle(String msg) {
}
}
package com.automannn.dubbo.SpiTest;
import com.automannn.dubbo.handleExtention.HandlerTest;
import org.apache.dubbo.common.extension.ExtensionLoader;
/**
* @author automannn
* @Date 2023/5/23
*/
public class SpiTest {
public static void main(String[] args) {
HandlerTest handlerTest = ExtensionLoader.getExtensionLoader(HandlerTest.class).getExtension("other");
HandlerTest defaultExtension = ExtensionLoader.getExtensionLoader(HandlerTest.class).getDefaultExtension();
System.out.println(handlerTest);
System.out.println(defaultExtension);
}
}
com.automannn.dubbo.handleExtention.OtherHandler@2a18f23c
com.automannn.dubbo.handleExtention.MyHandlerTest@d7b1517
# @Adaptive
- Dubbo扩展点机制中的自适应扩展点注解,用于标记Dubbo扩展点接口的自适应扩展点实现;
- 根据方法的参数类型,或者注解,来决定调用哪个扩展点实现;
# 实例
- 配置文件
META-INF/services/com.automannn.dubbo.handleExtention.HandlerTest
my=com.automannn.dubbo.handleExtention.MyHandlerTest
other=com.automannn.dubbo.handleExtention.OtherHandler
filter=com.automannn.dubbo.handleExtention.FilterHandler
adaptive=com.automannn.dubbo.handleExtention.TheAdaptiveHandler
package com.automannn.dubbo.adaptiveTest;
import com.automannn.dubbo.handleExtention.HandlerTest;
import org.apache.dubbo.common.extension.ExtensionLoader;
/**
* @author automannn
* @Date 2023/5/23
*/
public class AdaptiveTest {
public static void main(String[] args) {
HandlerTest adaptiveHandler = ExtensionLoader.getExtensionLoader(HandlerTest.class).getAdaptiveExtension();
System.out.println(adaptiveHandler);
}
}
com.automannn.dubbo.handleExtention.TheAdaptiveHandler@d7b1517
# @Activate
- Bubbo扩展点机制中的自动激活扩展点注解,用于标记Dubbo扩展点接口的自动激活扩展点实现;
- 用于在加载扩展点实现时,自动激活符合条件的扩展点实现;
# 实例
- 配置文件
META-INF/services/com.automannn.dubbo.handleExtention.HandlerTest
my=com.automannn.dubbo.handleExtention.MyHandlerTest
other=com.automannn.dubbo.handleExtention.OtherHandler
filter=com.automannn.dubbo.handleExtention.FilterHandler
adaptive=com.automannn.dubbo.handleExtention.TheAdaptiveHandler
filterA=com.automannn.dubbo.handleExtention.FilterAHandler
filterB=com.automannn.dubbo.handleExtention.FilterBHandler
package com.automannn.dubbo.handleExtention;
import org.apache.dubbo.common.extension.Activate;
/**
* @author automannn
* @Date 2023/5/23
*/
@Activate(group = "test",value="auto:testb",order = 4)
public class FilterBHandler implements HandlerTest{
@Override
public void handle(String msg) {
System.out.println("filterB");
}
}
package com.automannn.dubbo.handleExtention;
import org.apache.dubbo.common.extension.Activate;
/**
* @author automannn
* @Date 2023/5/23
*/
@Activate(group = "test",order = 1)
public class FilterAHandler implements HandlerTest{
@Override
public void handle(String msg) {
System.out.println("filterA");
}
}
package com.automannn.dubbo.activateTest;
import com.automannn.dubbo.handleExtention.HandlerTest;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import java.util.List;
/**
* @author automannn
* @Date 2023/5/23
*/
public class ActivatedTest {
public static void main(String[] args) {
URL url = URL.valueOf("dubbo://127.0.0.1:25088?name=filterA&auto=testb");
List<HandlerTest> adaptiveHandlerList = ExtensionLoader.getExtensionLoader(HandlerTest.class).getActivateExtension(url,"name","test");
URL url1 = URL.valueOf("dubbo://127.0.0.1:25088?name=filterA&auto=testa");
List<HandlerTest> adaptiveHandlerList1 = ExtensionLoader.getExtensionLoader(HandlerTest.class).getActivateExtension(url1,"name","test");
System.out.println(adaptiveHandlerList);
System.out.println(adaptiveHandlerList1);
}
}
[com.automannn.dubbo.handleExtention.FilterBHandler@d7b1517, com.automannn.dubbo.handleExtention.FilterAHandler@16c0663d]
[com.automannn.dubbo.handleExtention.FilterAHandler@16c0663d]
# dubbo消费者专题
# 初始化的触发
# 注解处理环境
- 基础设置
public abstract class InstantiationAwareBeanPostProcessorAdapter implements SmartInstantiationAwareBeanPostProcessor {
@Deprecated
@Override
public PropertyValues postProcessPropertyValues(
PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException {
return pvs;
}
}
public abstract class AbstractAnnotationBeanPostProcessor extends
InstantiationAwareBeanPostProcessorAdapter {
@Override
/*xxx: 后置处理 (核心方法)*/
public PropertyValues postProcessPropertyValues(
PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {
InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs);
/*xxx: 对需要注入的元数据进行注入*/
metadata.inject(bean, beanName, pvs);
}
/*xxx: 查找需要注解注入的元数据*/
private InjectionMetadata findInjectionMetadata(String beanName, Class<?> clazz, PropertyValues pvs) {
/*xxx: 构建元数据*/
return buildAnnotatedMetadata(clazz);
}
private AnnotatedInjectionMetadata buildAnnotatedMetadata(final Class<?> beanClass) {
/*xxx: 通过属性查找需要注入的属性*/
Collection<AnnotatedFieldElement> fieldElements = findFieldAnnotationMetadata(beanClass);
/*xxx: 通过方法查找需要注入的属性*/
Collection<AnnotatedMethodElement> methodElements = findAnnotatedMethodMetadata(beanClass);
return new AnnotatedInjectionMetadata(beanClass, fieldElements, methodElements);
}
/*xxx: 查找根据属性注入的方法*/
private List<AnnotatedFieldElement> findFieldAnnotationMetadata(final Class<?> beanClass) {
final List<AnnotatedFieldElement> elements = new LinkedList<AnnotatedFieldElement>();
/*xxx: 遍历需要处理的注解*/
for (Class<? extends Annotation> annotationType : getAnnotationTypes()) {
/*xxx: 获取注解中的属性*/
AnnotationAttributes attributes = getAnnotationAttributes(field, annotationType, getEnvironment(), true, true);
/*xxx: 构造成 注解元素配置信息,并保存 */
elements.add(new AnnotatedFieldElement(field, attributes));
}
}
}
/*xxx: dubbo的 @reference 注解后置处理器 */
public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements
ApplicationContextAware {
/*xxx: 注解后置处理器处理的注解类型 */
public ReferenceAnnotationBeanPostProcessor() {
super(DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class);
}
}
- 自动注入的处理(核心)
/*xxx: 为方法注入属性*/
private class AnnotatedMethodElement extends InjectionMetadata.InjectedElement {
@Override
protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
Object injectedObject = getInjectedObject(attributes, bean, beanName, injectedType, this);
/*xxx: 方法的注入,通过反射进行*/
method.invoke(bean, injectedObject);
}
/*xxx: 从指定的注解或者 bean中,获取 注入对象*/
//xxx: 这个方法位于 AbstractAnnotationBeanPostProcessor 中
protected Object getInjectedObject(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {
return doGetInjectedBean(attributes, bean, beanName, injectedType, injectedElement);
}
/*xxx: 子类通过该方法获取注入的对象*/
protected abstract Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception;
}
/*xxx: dubbo的 @reference 注解后置处理器 */
public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements
ApplicationContextAware {
@Override
/*xxx: 通过该方法获取注入的对象*/
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {
/*xxx: 获取引用的bean的名称 */
String referenceBeanName = getReferenceBeanName(attributes, injectedType);
ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);
/*xxx: 验证是否是本地的bean */
boolean localServiceBean = isLocalServiceBean(referencedBeanName, referenceBean, attributes);
/*xxx: referenceBean前置准备*/
prepareReferenceBean(referencedBeanName, referenceBean, localServiceBean);
/*xxx: 注册 referenceBean到上下文 */
registerReferenceBean(referencedBeanName, referenceBean, attributes, localServiceBean, injectedType);
/*xxx: 为 referenceBean注入缓存信息*/
cacheInjectedReferenceBean(referenceBean, injectedElement);
return referenceBean.get();
}
}
# bean环境
/*xxx: reference工厂*/
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean,
ApplicationContextAware, InitializingBean, DisposableBean {
@Override
public Object getObject() {
return get();
}
}
public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig {
/*xxx: 获取代理*/
public abstract T get();
public abstract void destroy();
}
# 初始化流程
# 基础设施
public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig {
/*xxx: 获取代理*/
public abstract T get();
public abstract void destroy();
}
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
//xxx: 实际的接口代理对象
private transient volatile T ref;
public synchronized T get() {
/*xxx: referenceConfig也有ref的关键属性,作用类似于ServiceConfig中的ref,实际上是一个代理对象*/
if (ref == null) {
/*xxx: 初始化*/
init();
}
return ref;
}
public synchronized void init() {
/*xxx: 初始化dubboBootstrap */
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
Map<String, String> map = new HashMap<String, String>();
/*xxx: 设置服务的类型,为消费者端*/
map.put(SIDE_KEY, CONSUMER_SIDE);
/*xxx: 获取消费者接口的所有方法 */
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
map.put("interface", interfaceName);
/*xxx: 设置注册到注册中心的本地地址 */
String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
/*xxx: 如果没有在系统层面进行配置,则获取本机的地址*/
if (StringUtils.isEmpty(hostToRegistry)) {
hostToRegistry = NetUtils.getLocalHost();
}
map.put(REGISTER_IP_KEY, hostToRegistry);
/*xxx: 根据接口信息,创建服务代理 */
/*xxx: 核心*/
ref = createProxy(map);
/*xxx: 检测服务是否正常注册 */
checkInvokerAvailable();
}
}
# 创建代理
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
/*xxx: 代理工厂扩展点*/
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
/*xxx: 根据服务元数据,创建代理 */
private T createProxy(Map<String, String> map) {
/*xxx: 根据实际的协议进行引用 */
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
/*xxx: 检测注册中心*/
checkRegistry();
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
}
# 协议调用
/*xxx: 注册中心协议 */
public class RegistryProtocol implements Protocol {
@Override
/*xxx: 注册中心引用 */
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
Registry registry = registryFactory.getRegistry(url);
/*xxx: 获取提供者集群,没指定的情况下,使用 MockCluster*/
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
/*xxx: 消费者是否需要注册 */
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
/*xxx: 将消费者进行注册*/
registry.register(directory.getRegisteredConsumerUrl());
}
/*xxx: 订阅生产者节点 */
directory.subscribe(toSubscribeUrl(subscribeUrl));
/*xxx: 获取代理 */
Invoker<T> invoker = cluster.join(directory);
}
}
# 注册中心获取
- 工厂配置,位于
META-INF/dubbo/com.alibaba.dubbo.registry.RegistryFactory
spring-cloud=com.alibaba.cloud.dubbo.registry.SpringCloudRegistryFactory
@SPI("dubbo")
public interface RegistryFactory {
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
public abstract class AbstractRegistryFactory implements RegistryFactory {
@Override
public Registry getRegistry(URL url) {
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
registry = createRegistry(url);
}
protected abstract Registry createRegistry(URL url);
}
/*xxx: 基于springCloud的注册中心 */
public class SpringCloudRegistryFactory extends AbstractRegistryFactory {
private DiscoveryClient discoveryClient;
@Override
/*xxx: 创建注册中心*/
public Registry createRegistry(URL url) {
//xxx: 初始化
init();
/*xxx: 使用dubboCloud作为注册中心实现 "*/
registry = new DubboCloudRegistry(url, discoveryClient,
dubboServiceMetadataRepository, dubboMetadataConfigServiceProxy,
jsonUtils, applicationContext);
}
}
# 服务可达性检测
/*xxx: 消费者接口层配置*/
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
/*xxx: 创建服务消费者是否正常进行了订阅*/
private void checkInvokerAvailable() throws IllegalStateException {
if (shouldCheck() && !invoker.isAvailable()) {
invoker.destroy();
throw new IllegalStateException("Failed to check the status of the service "
+ interfaceName
+ ". No provider available for the service "
+ (group == null ? "" : group + "/")
+ interfaceName +
(version == null ? "" : ":" + version)
+ " from the url "
+ invoker.getUrl()
+ " to the consumer "
+ NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
}
}
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
@Override
public boolean isAvailable() {
if (isDestroyed()) {
return false;
}
Map<String, Invoker<T>> localUrlInvokerMap = urlInvokerMap;
if (localUrlInvokerMap != null && localUrlInvokerMap.size() > 0) {
for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
if (invoker.isAvailable()) {
return true;
}
}
}
return false;
}
}
# 消费者订阅
/*xxx: 注册中心协议 */
public class RegistryProtocol implements Protocol {
@Override
/*xxx: 注册中心引用 */
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
URL subscribeUrl = new URL("consumer", parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
/*xxx: 订阅生产者节点 */
directory.subscribe(toSubscribeUrl(subscribeUrl));
//xxx: 注意,是先订阅,再获取的代理
/*xxx: 获取代理 */
Invoker<T> invoker = cluster.join(directory);
}
}
/*xxx:注册中心路径抽象 */
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public void subscribe(URL url) {
registry.subscribe(url, this);
}
@Override
public synchronized void notify(List<URL> urls) {
/*xxx: 获取提供者url信息*/
List<URL> providerURLs = categoryUrls.getOrDefault("providers", Collections.emptyList());
/*xxx: 根据提供者的url, 更新invoker信息*/
refreshOverrideAndInvoker(providerURLs);
}
private void refreshOverrideAndInvoker(List<URL> urls) {
refreshInvoker(urls);
}
/*xxx: 通过订阅节点信息,动态更新 invokers */
private void refreshInvoker(List<URL> invokerUrls) {
/*xxx: 将url 转换为 Invoker*/
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
this.urlInvokerMap = newUrlInvokerMap;
}
}
# dubboCloud专题
# 自动配置项
# 元数据自动配置
@Import({ DubboServiceMetadataRepository.class, IntrospectiveDubboMetadataService.class,DubboMetadataServiceProxy.class })
/*xxx: dubbo元数据自动配置 */
public class DubboMetadataAutoConfiguration {
}
# 注册中心
/*xxx: springCloud-dubbo的 注册中心配置*/
public class DubboServiceRegistrationAutoConfiguration {
@Bean
/*xxx: 如果自定义配置中,没有配置 spring-cloud注册中心,则要将该注册中心进行注册 */
@Conditional({ MissingSpringCloudRegistryConfigPropertyCondition.class })
/*xxx: 进行 springCloud 注册中心配置*/
/*xxx: spirngCloud的核心操作 */
public RegistryConfig defaultSpringCloudRegistryConfig() {
return new RegistryConfig(ADDRESS, PROTOCOL);
}
@EventListener(ServiceInstancePreRegisteredEvent.class)
/*xxx: 服务实例预注册 */
public void onServiceInstancePreRegistered(ServiceInstancePreRegisteredEvent event) {
attachDubboMetadataServiceMetadata(registration);
}
/*xxx: 添加dubbo服务元数据 */
private void attachDubboMetadataServiceMetadata(Registration registration) {
if (registration == null) {
return;
}
synchronized (registration) {
/*xxx: 获取注册表元数据 */
Map<String, String> metadata = registration.getMetadata();
attachDubboMetadataServiceMetadata(metadata);
}
}
private void attachDubboMetadataServiceMetadata(Map<String, String> metadata) {
/*xxx: 获取dubbo服务 元数据*/
Map<String, String> serviceMetadata = dubboServiceMetadataRepository
.getDubboMetadataServiceMetadata();
if (!isEmpty(serviceMetadata)) {
/*xxx: 将dubbo服务 元数据保存至注册表元数据中 */
metadata.putAll(serviceMetadata);
}
}
}
# 泛型工厂配置
@EnableConfigurationProperties(DubboCloudProperties.class)
/*xxx: 服务自动注册*/
public class DubboServiceAutoConfiguration {
@Bean
@ConditionalOnMissingBean
/*xxx: 泛型服务工厂*/
public DubboGenericServiceFactory dubboGenericServiceFactory() {
return new DubboGenericServiceFactory();
}
}
# 服务自动发现
/*xxx: 服务自动发现配置 */
public class DubboServiceDiscoveryAutoConfiguration {
@EventListener(HeartbeatEvent.class)
public void onHeartbeatEvent(HeartbeatEvent event) {
Stream<String> subscribedServices = dubboServiceMetadataRepository
.initSubscribedServices();
heartbeatEventChangedPredicate.ifAvailable(predicate -> {
if (predicate.test(event)) {
//xxx: 分发服务实例变更事件
subscribedServices.forEach(serviceName -> {
List<ServiceInstance> serviceInstances = getInstances(serviceName);
dispatchServiceInstancesChangedEvent(serviceName, serviceInstances);
});
}
});
}
}
# 注册中心配置原理
# dubboCloud注册中心工厂
- 配置文件
META-INF/dubbo/com.alibaba.dubbo.registry.RegistryFactory
spring-cloud=com.alibaba.cloud.dubbo.registry.SpringCloudRegistryFactory
/*xxx: 基于springCloud的注册中心 */
public class SpringCloudRegistryFactory extends AbstractRegistryFactory {
/*xxx: 注册中心初始化 */
protected void init() {
/*xxx: 服务发现客户端*/
this.discoveryClient = applicationContext.getBean(DiscoveryClient.class);
/*xxx: dubbo服务元数据仓库 */
this.dubboServiceMetadataRepository = applicationContext
.getBean(DubboServiceMetadataRepository.class);
}
@Override
/*xxx: 创建注册中心*/
public Registry createRegistry(URL url) {
//xxx: 初始化
init();
/*xxx: 使用dubboCloud作为注册中心实现 "*/
registry = new DubboCloudRegistry(url, discoveryClient,
dubboServiceMetadataRepository, dubboMetadataConfigServiceProxy,
jsonUtils, applicationContext);
return registry;
}
}
# dubboCloud注册中心
/*xxx: dubboCloud注册中心*/
public class DubboCloudRegistry extends FailbackRegistry
implements ApplicationListener<ServiceInstancesChangedEvent> {
/*xxx: 服务发现客户端 */
private final DiscoveryClient discoveryClient;
/*xxx: dubbo元数据仓库, 所有的操作,都是围绕其进行 */
private final DubboServiceMetadataRepository repository;
@Override
/*xxx: 注册服务 */
public final void doRegister(URL url) {
synchronized (this) {
/*xxx: 初始化*/
preInit();
if (shouldNotRegister(url)) {
return;
}
/*xxx: 生产者进行注册, 注册到元数据仓库中的(注册到本地)*/
repository.exportURL(url);
}
}
private void preInit() {
//xxx: 如果初始化时,已经存在订阅处理器实例,则进行初始化
urlSubscribeHandlerMap.forEach((url, handler) -> handler.init());
/*xxx: 初始化元数据*/
repository.initializeMetadata();
/*xxx: 监听实例变更事件 */
applicationContext.addApplicationListener(this);
}
@Override
//xxx: 消费者操作的端口,消费者通过 注册中心进行订阅
public final void doSubscribe(URL url, NotifyListener listener) {
synchronized (this) {
preInit();
/*xxx: 如果是监控订阅*/
if (isAdminURL(url)) {
//xxx: 略
}else if (isDubboMetadataServiceURL(url) && containsProviderCategory(url)) {
String appName = getServiceName(url);
/*xxx: 如果是元数据订阅*/
MetadataServiceSubscribeHandler handler = new MetadataServiceSubscribeHandler(
appName, url, listener, this, dubboMetadataUtils);
if (inited.get()) {
handler.init();
}
metadataSubscribeHandlerMap.put(appName, handler);
}/*xxx: 如果是客户端订阅, 则所有的代理实现,由 GeneralServiceSubsribeHandler实现, 可以类比 dubbo的 泛型服务*/
else if (isConsumerServiceURL(url)) {
GenearalServiceSubscribeHandler handler = new GenearalServiceSubscribeHandler(
url, listener, this, repository, jsonUtils,
dubboMetadataConfigServiceProxy);
if (inited.get()) {
/*xxx: 进行初始化*/
handler.init();
}
/*xxx: 将路径 与 订阅处理器实例进行绑定*/
urlSubscribeHandlerMap.put(url, handler);
}
}
}
}
# 元数据仓库与元数据服务
# 元数据仓库
@Repository
/*xxx: 元数据仓库 */
public class DubboServiceMetadataRepository
implements SmartInitializingSingleton, ApplicationEventPublisherAware {
/*xxx: 初始化元数据*/
public void initializeMetadata() {
doGetSubscribedServices().forEach(this::initializeMetadata);
}
public void initializeMetadata(String serviceName) {
/*xxx: 初始化dubbo的rest服务元数据*/
initDubboRestServiceMetadataRepository(serviceName);
}
protected void initDubboRestServiceMetadataRepository(String serviceName) {
/*xxx: 获取服务实例的元数据信息 */
Set<ServiceRestMetadata> serviceRestMetadataSet = getServiceRestMetadataSet(
serviceName);
for (ServiceRestMetadata serviceRestMetadata : serviceRestMetadataSet) {
/*xxx: 将dubboServiceMetadata的服务元数据 写入 微服务实例的元数据中*/
serviceRestMetadata.getMeta().forEach(restMethodMetadata -> {
RequestMetadata requestMetadata = restMethodMetadata.getRequest();
RequestMetadataMatcher matcher = new RequestMetadataMatcher(
requestMetadata);
DubboRestServiceMetadata metadata = new DubboRestServiceMetadata(
serviceRestMetadata, restMethodMetadata);
/*xxx: 保存服务元数据信息 */
metadataMap.put(matcher, metadata);
});
}
}
/*xxx: 注册服务到注册中心(暴露服务)*/
public void exportURL(URL url) {
URL actualURL = url;
/*xxx: 将ip 进行拆分 */
InetUtils.HostInfo hostInfo = inetUtils.findFirstNonLoopbackHostInfo();
String ipAddress = hostInfo.getIpAddress();
if (!Objects.equals(url.getHost(), ipAddress)) {
actualURL = url.setHost(ipAddress);
}
/*xxx: 添加服务信息, 通过将serviceKey 与 url进行映射存储 */
/*xxx: serviceKey 由接口名+组名+版本号 组成*/
this.allExportedURLs.add(actualURL.getServiceKey(), actualURL);
}
}
# 元数据服务
public class IntrospectiveDubboMetadataService implements DubboMetadataService {
@Autowired
/*xxx: 元数据仓库*/
private ObjectProvider<DubboServiceMetadataRepository> dubboServiceMetadataRepository;
@Override
/*xxx: 获取服务提供者信息*/
public String getExportedURLs(String serviceInterface, String group, String version) {
List<URL> urls = getRepository().getExportedURLs(serviceInterface, group,
version);
return jsonUtils.toJSON(urls);
}
private DubboServiceMetadataRepository getRepository() {
return dubboServiceMetadataRepository.getIfAvailable();
}
}
# 元素据服务代理
/*xxx: 服务元数据代理*/
public class DubboMetadataServiceProxy implements BeanClassLoaderAware, DisposableBean {
/*xxx: 服务发现客户端 */
private final DiscoveryClient discoveryClient;
/*xxx: 通过服务实例名 获取元数据服务 */
private DubboMetadataService getProxy0(String serviceName) {
return getProxy(discoveryClient.getInstances(serviceName));
}
/*xxx: 通过服务实例 获取元数据服务 代理 */
public DubboMetadataService getProxy(List<ServiceInstance> serviceInstances) {
/*xxx: 遍历 获取元数据服务路径 */
List<URL> dubboMetadataServiceURLs = getDubboMetadataServiceURLs(
serviceInstance.get());
for (URL dubboMetadataServiceURL : dubboMetadataServiceURLs) {
/*xxx: 根据元数据服务路径,创建元数据服务 代理 */
DubboMetadataService dubboMetadataService = createProxyIfAbsent(
dubboMetadataServiceURL);
if (dubboMetadataService != null) {
return dubboMetadataService;
}
}
}
/*xxx: 创建元数据服务代理 */
private DubboMetadataService createProxyIfAbsent(URL dubboMetadataServiceURL) {
String serviceName = dubboMetadataServiceURL.getParameter(APPLICATION_KEY);
String version = dubboMetadataServiceURL.getParameter(VERSION_KEY);
/*xxx: 创建元数据服务代理 */
return createProxyIfAbsent(serviceName, version);
}
/*xxx: 创建代理 */
protected DubboMetadataService createProxy(String serviceName, String version) {
return (DubboMetadataService) newProxyInstance(classLoader,
new Class[] { DubboMetadataService.class },
/*xxx: 代理实现*/
new DubboMetadataServiceInvocationHandler(serviceName, version,
dubboGenericServiceFactory));
}
}
# 元数据服务代理实现
/*xxx: dubbo元数据代理实现 */
class DubboMetadataServiceInvocationHandler implements InvocationHandler {
DubboMetadataServiceInvocationHandler(String serviceName, String version,
DubboGenericServiceFactory dubboGenericServiceFactory) {
/*xxx: 通过dubbo泛型工厂,创建 dubbo泛型服务 */
this.genericService = dubboGenericServiceFactory.create(serviceName,
DubboMetadataService.class, version);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
/*xxx: 通过泛型服务调用指定的方法*/
returnValue = genericService.$invoke(method.getName(),
getParameterTypes(method), args);
}
}
# 泛型服务工厂
/*xxx: 泛型工厂*/
public class DubboGenericServiceFactory {
@Autowired
/*xxx: 注册中心配置,为生产 dubbo泛型服务提供支持 */
private ObjectProvider<List<RegistryConfig>> registryConfigs;
/*xxx: 创建泛型服务 */
public GenericService create(String serviceName, Class<?> serviceClass,
String version) {
String interfaceName = serviceClass.getName();
ReferenceBean<GenericService> referenceBean = build(interfaceName, version,
serviceName, emptyMap());
return referenceBean.get();
}
/*xxx: 构建消费者*/
private ReferenceBean<GenericService> build(String interfaceName, String version,
String group, Map<String, Object> dubboTranslatedAttributes) {
String key = createKey(interfaceName, version, group, dubboTranslatedAttributes);
return cache.computeIfAbsent(key, k -> {
ReferenceBean<GenericService> referenceBean = new ReferenceBean<>();
referenceBean.setGeneric(true);
referenceBean.setInterface(interfaceName);
referenceBean.setVersion(version);
referenceBean.setGroup(group);
referenceBean.setCheck(false);
bindReferenceBean(referenceBean, dubboTranslatedAttributes);
return referenceBean;
});
}
/*xxx: 配置消费者 */
private void bindReferenceBean(ReferenceBean<GenericService> referenceBean,
Map<String, Object> dubboTranslatedAttributes) {
DataBinder dataBinder = new DataBinder(referenceBean);
// ignore "registries" field and then use RegistryConfig beans
dataBinder.setDisallowedFields("registries");
registryConfigs.ifAvailable(referenceBean::setRegistries);
}
}
# 服务上线
/*xxx: dubboCloud注册中心*/
public class DubboCloudRegistry extends FailbackRegistry
implements ApplicationListener<ServiceInstancesChangedEvent> {
/*xxx: dubbo元数据仓库, 所有的操作,都是围绕其进行 */
private final DubboServiceMetadataRepository repository;
@Override
/*xxx: 注册服务 */
public final void doRegister(URL url) {
synchronized (this) {
/*xxx: 初始化*/
preInit();
if (shouldNotRegister(url)) {
return;
}
/*xxx: 生产者进行注册, 注册到元数据仓库中的(注册到本地)*/
repository.exportURL(url);
}
}
}
@Repository
/*xxx: 元数据仓库 */
public class DubboServiceMetadataRepository
implements SmartInitializingSingleton, ApplicationEventPublisherAware {
/*xxx: 注册服务到注册中心(暴露服务)*/
public void exportURL(URL url) {
URL actualURL = url;
/*xxx: 将ip 进行拆分 */
InetUtils.HostInfo hostInfo = inetUtils.findFirstNonLoopbackHostInfo();
String ipAddress = hostInfo.getIpAddress();
if (!Objects.equals(url.getHost(), ipAddress)) {
actualURL = url.setHost(ipAddress);
}
/*xxx: 添加服务信息, 通过将serviceKey 与 url进行映射存储 */
/*xxx: serviceKey 由接口名+组名+版本号 组成*/
this.allExportedURLs.add(actualURL.getServiceKey(), actualURL);
}
}
# 消费订阅
/*xxx: dubboCloud注册中心*/
public class DubboCloudRegistry extends FailbackRegistry
implements ApplicationListener<ServiceInstancesChangedEvent> {
private void preInit() {
//xxx: 如果初始化时,已经存在订阅处理器实例,则进行初始化
urlSubscribeHandlerMap.forEach((url, handler) -> handler.init());
/*xxx: 初始化元数据*/
repository.initializeMetadata();
/*xxx: 监听实例变更事件 */
applicationContext.addApplicationListener(this);
}
@Override
//xxx: 消费者操作的端口,消费者通过 注册中心进行订阅
public final void doSubscribe(URL url, NotifyListener listener) {
synchronized (this) {
preInit();
/*xxx: 如果是监控订阅*/
if (isAdminURL(url)) {
//xxx: 略
}else if (isDubboMetadataServiceURL(url) && containsProviderCategory(url)) {
String appName = getServiceName(url);
/*xxx: 如果是元数据订阅*/
MetadataServiceSubscribeHandler handler = new MetadataServiceSubscribeHandler(
appName, url, listener, this, dubboMetadataUtils);
if (inited.get()) {
handler.init();
}
metadataSubscribeHandlerMap.put(appName, handler);
}
/*xxx: 如果是客户端订阅, 则所有的代理实现,由 GeneralServiceSubsribeHandler实现, 可以类比 dubbo的 泛型服务*/
else if (isConsumerServiceURL(url)) {
GenearalServiceSubscribeHandler handler = new GenearalServiceSubscribeHandler(
url, listener, this, repository, jsonUtils,
dubboMetadataConfigServiceProxy);
if (inited.get()) {
/*xxx: 进行初始化*/
handler.init();
}
/*xxx: 将路径 与 订阅处理器实例进行绑定*/
urlSubscribeHandlerMap.put(url, handler);
}
}
}
}
# 泛型订阅处理器
/*xxx: 服务订阅处理器 */
public abstract class AbstractServiceSubscribeHandler {
public void init() {
if (inited.compareAndSet(false, true)) {
doInit();
}
}
protected abstract void doInit();
}
//xxx: 泛型服务订阅处理器
public class GenearalServiceSubscribeHandler extends AbstractServiceSubscribeHandler {
//xxx: 泛型服务订阅处理初始化
public synchronized void doInit() {
/*xxx: 获取所有服务实例中,与当前 springCloud 兼容的 dubbo提供者 */
Map<String, Map<String, List<ServiceInstance>>> map = registry
.getServiceRevisionInstanceMap();
for (Map.Entry<String, Map<String, List<ServiceInstance>>> entry : map
.entrySet()) {
String appName = entry.getKey();
/*xxx: 获取当前服务的服务集群*/
Map<String, List<ServiceInstance>> revisionMap = entry.getValue();
for (Map.Entry<String, List<ServiceInstance>> revisionEntity : revisionMap
.entrySet()) {
String revision = revisionEntity.getKey();
List<ServiceInstance> instances = revisionEntity.getValue();
/*xxx: 订阅对应服务实例上的提供者*/
init(appName, revision, instances);
}
}
/*xxx: 获取到服务提供者信息后 进行refresh操作 */
refresh();
}
public synchronized void refresh() {
List<URL> urls = getProviderURLs();
/*xxx: 该操作,更新 directory的 提供者 invokers */
notifyAllSubscribedURLs(url, urls, listener);
}
public void init(String appName, String revision,
List<ServiceInstance> instanceList) {
/*xxx: 获取 服务提供者信息 */
List<URL> urls = getTemplateExportedURLs(url, revision, instanceList);
if (urls != null && urls.size() > 0) {
addAppNameWithRevision(appName, revision);
/*xxx: 缓存服务提供者信息 */
setUrlTemplate(appName, revision, urls);
}
}
private List<URL> getTemplateExportedURLs(URL subscribedURL, String revision,
List<ServiceInstance> serviceInstances) {
/*xxx: 获取dubbo元数据服务, 这个是通过dubbo调用获取到的结果*/
DubboMetadataService dubboMetadataService = getProxy(serviceInstances);
List<URL> templateExportedURLs = emptyList();
/*xxx: 获取到服务提供者信息*/
templateExportedURLs = getExportedURLs(dubboMetadataService, revision,
subscribedURL);
return templateExportedURLs;
}
private List<URL> getExportedURLs(DubboMetadataService dubboMetadataService,
String revision, URL subscribedURL) {
String serviceInterface = subscribedURL.getServiceInterface();
String group = subscribedURL.getParameter(GROUP_KEY);
String version = subscribedURL.getParameter(VERSION_KEY);
RpcContext.getContext().setAttachment(SCA_REVSION_KEY, revision);
//xxx: 此处调用 metadataDataService获取元数据信息
//xxx: metaDataService是一个消费者,这个最终会调用 dubbo获取
//xxx: 最终获取的动作是,在对应服务实例 内存中的 一个 map属性获取, 也就是 metaDataRepository
//xxx: 返回服务提供者的节点信息
String exportedURLsJSON = dubboMetadataService.getExportedURLs(serviceInterface,
group, version);
String subscribedProtocol = subscribedURL.getParameter(PROTOCOL_KEY);
//xxx: 将该服务提供者中,所有不需要协议,或者与消费者协议相匹配的,都筛选出来
return jsonUtils.toURLs(exportedURLsJSON).stream()
.filter(exportedURL -> subscribedProtocol == null
|| subscribedProtocol.equalsIgnoreCase(exportedURL.getProtocol()))
.collect(Collectors.toList());
}
}
# dubbo调用链路调试
- 消费者:调用应用接口 -> InvokerInvocationHandler.invoke -> 一系列Invoker.invoke -> 一系列Filter,包括ConsumerContextFilter.invoke -> AsyncToSyncInvoker.invoke -> DubboInvoker.invoke -> LazyConnectExchangeClient.request -> AbstractPeer.send -> NettyChannel.send
- 生产者:thread.run -> ThreadPoolExecutor.runWorker -> org.apache.dubbo.remoting.transport.DecodeHandler.received -> HeaderExchangeHandler.received -> HeaderExchangeHandler.handleRequest -> DubboProtocol.reply -> 一系列dubbo的filter的invoke -> DelegateProviderMetaDataInvoker.invoke -> JavassistProxyFactory.doInvoke -> 应用方法