# 概述

  • 阿里巴巴开源的一个高性能服务框架,使得应用可通过高性能RPC实现服务的输出和输入功能;
  • 高性能、轻量级的开源java RPC框架,提供了三个核心能力: 面向接口的远程方法调用智能容错和负载均衡服务自动注册和发现

# 发展史

  • 早期是阿里巴巴内部使用的一个分布式服务治理框架
  • 2012年开源
  • 很多公司基于自身业务特性,进行优化和改进,典型的有:京东JSF,新浪Motan,当当dubbox
  • 2014年10月,dubbo停止维护
  • 2017年9月,阿里宣布重启dubbo,并做好了长期投入的准备;
  • 2018年1月,dubbo创始人之一梁飞透露Dubbo3.0正在动工;
  • 2018年2月,dubbo捐献给Apache基金会;
  • 2019年5月21日,dubbo正式毕业,成为apache顶级开源项目;
  • 2020年,发布3.0往云原生项目发展的战略计划

云原生: 基于分布式部署统一运维管理的分布式云,以容器、微服务、DevOps等技术未基础建立的一套云技术产品体系

# 发布史

  • 2011.10,发布2.0.7,是开源后的第一个发布版;
  • 2012.08, 发布2.5.0;
  • 2017.09,发布2.5.4(中间暂停维护了);
  • 2019.01,发布2.7.0
  • 2021.02,发布2.7.9
  • 2021.06,发布3.0

# 工作原理

dubbo工作工作原理

# 架构设计

dubbo架构

# 实践

# 定义服务接口

/**
 * @author chenkh
 * @time 2021/7/28
 */
public interface HelloService {
    void sayHello(String name);

    void sayHi(String name);
}

# 定义dubbo工具类

/**
 * @author chenkh
 * @time 2021/7/26
 */
public class DubboUtils {

    public static  boolean  registerService(Class clazzImpl,Object serviceImpl,String appName,String registryCenter,String namespace){
        ServiceConfig serviceConfig = new ServiceConfig();
        serviceConfig.setInterface(clazzImpl);
        serviceConfig.setInterface(clazzImpl.getName());
        serviceConfig.setVersion("1.0");
        serviceConfig.setGroup("practice");
        serviceConfig.setRef(serviceImpl);

        // 普通编码配置方式
        ApplicationConfig application = new ApplicationConfig();
        // 客户端应用名:可以任意取名
        application.setName(appName);
        application.setLogger("slf4j");

        // 连接注册中心配置
        RegistryConfig registry1 = new RegistryConfig();
        registry1.setAddress(registryCenter);
        registry1.setGroup(namespace);
        List<RegistryConfig> registries = new ArrayList<>();
        registries.add(registry1);

        serviceConfig.setRegistries(registries);
        serviceConfig.setApplication(application);
        serviceConfig.export();
        return true;
    };
    
    //创建轻量级消费者引用
    private static ReferenceConfig createLightReferenceConfig(String appName,String registryCenter,String namespace) {
        // 普通编码配置方式
        ApplicationConfig application = new ApplicationConfig();
        // 客户端应用名:可以任意取名
        application.setName(appName);
        application.setLogger("slf4j");
        application.setQosPort(22223);

        // 连接注册中心配置
        RegistryConfig registry1 = new RegistryConfig();
        registry1.setAddress(registryCenter);
        registry1.setGroup(namespace);
        List<RegistryConfig> registries = new ArrayList<>();
        registries.add(registry1);

        // 引用远程服务
        // 当获取到实际的客户端连接后,该实例变得很重,里面封装了所有与注册中心及服务提供方连接,需要缓存
        // 此时,尚处于一个轻量级的实例
        ReferenceConfig reference = new ReferenceConfig();
        reference.setApplication(application);
        reference.setRegistries(registries);
        reference.setCheck(false);
        reference.setGroup("practice");
        return reference;
    }
    
    private static String getTag(String... tag) {
        String tagEnv = System.getProperty("dubbo.provider.tag");
        if (tag != null && tag.length > 0 && StringUtils.isNotBlank(tag[0])) {
            tagEnv = tag[0];
        }
        return tagEnv;
    }
    
    public static <T> T getService(Class<T> clazz, String version, String... tag) {
        if (clazz == null || StringUtils.isBlank(version)) {
            throw new IllegalArgumentException("DubboUtils#getService():参数不能为空.");
        }
        StringBuilder sb = new StringBuilder();
        String key = sb.append(clazz.getName()).append(":").append(version).toString();
        Object objService = serviceMap.get(key);
        if (objService != null && !StringUtils.equals(objService.getClass().getName(), clazz.getName())) {
            throw new IllegalArgumentException("DubboUtils#getService():返回值类型不符.期望:" + clazz.getName() +
                    ",实际:" + objService.getClass().getName());
        }
        if (serviceMap.containsKey(key)) {
            return (T) objService;
        }
        //首次使用时,创建轻量级客户端引用
        ReferenceConfig reference = createLightReferenceConfig("dubbo-generic-automan-practice-consumer","zookeeper://localhost:2181","practice");

        // 设置tag:用于灰度发布、线上程序调试
        String tagEnv = getTag(tag);
        if (StringUtils.isNotBlank(tagEnv)) {
            ConsumerConfig consumerConfig = new ConsumerConfig();
            consumerConfig.setTag(tagEnv);
            reference.setConsumer(consumerConfig);
        }
        reference.setVersion(version);
        // 弱类型接口名
        reference.setInterface(clazz);
        // 声明为泛化接口
        reference.setGeneric("false");

        //获取到客户端引用,此实例变得很重,需要缓存起来
        objService = reference.get();
        serviceMap.put(key, objService);
        return (T) objService;
    }
    
}

# 服务提供者

  • 通过指定启动参数:DUBBO_DUBBO_IP_TO_REGISTRY=100.0.0.1可实现服务生产者ip自定义配置
/**
 * @author chenkh
 * @time 2021/7/26
 */
public class DubboRunWithoutSpringPracticeProvider {

    public static void main(String[] args) {
        //注册服务
        DubboUtils.registerService(HelloService.class,new HelloServiceImpl(),"dubbo-generic-automan-practice-provider","zookeeper://localhost:2181","practice");
        Scanner scanner = new Scanner(System.in);
        scanner.nextLine();
    }
}

# 服务消费者

package com.automannn.dubbo.withoutSpring;

import com.automannn.dubbo.api.HelloService;
import com.automannn.dubbo.withoutSpring.util.DubboUtils;

/**
 * @author chenkh
 * @time 2021/7/28
 */
public class DubboRunWithoutSpringPracticeConsumer {

    public static void main(String[] args) {
        HelloService helloService= DubboUtils.getService(HelloService.class,"1.0","");
        helloService.sayHello("automannn");
        helloService.sayHi("automannn");
    }
}

# 源码

# SPI机制(抽象工厂,获取接口的实现类)

  • 扩展点加载器-容器机制
/*xxx: 用于加载 spi的实现类, 线程级别的单例,具有多种加载策略,包括 @SPI策略,@Adaptive策略,Spring的策略 */
    /*xxx: 扩展点加载器 */
public class ExtensionLoader<T> {
    /*xxx: 扩展点加载器缓存集合,同一类扩展点进行复用 */
    private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>(64);

    /*xxx: 扩展点的实现缓存,复用*/
    private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>(64);
    
    /*xxx: 扩展点工厂, 每个扩展点加载器实例,都具有一个扩展点工厂,仅用于注入属性*/
    private final ExtensionFactory objectFactory;
    
    /*xxx: 扩展点集合缓存,只会加载(扫描)一次 */
    private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();
    
    /*xxx: 是否具有扩展点注解 */
    private static <T> boolean withExtensionAnnotation(Class<T> type) {
        return type.isAnnotationPresent(SPI.class);
    }
    
    /*xxx: 有条件激活的扩展类 集合*/
    private final Map<String, Object> cachedActivates = new ConcurrentHashMap<>();
    
    /*xxx: 扩展点加载器 实例化*/
    /*xxx: 抽象工厂模式*/
    private ExtensionLoader(Class<?> type) {
        this.type = type;
        /*xxx: 1.当传入类型,是扩展点顶级工厂接口的时候,当前扩展点加载器实例的 扩展点工厂置空*/
        /*xxx: 2.其它类型时,则用 扩展点工厂为空的 扩展点加载器,加载 兼容的扩展点实现(本质上,就是获取工厂的默认实现) */
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }
    
    /*xxx: 获取兼容的扩展点 */
    public T getAdaptiveExtension() {
        /*xxx: 首次获取时,进行创建,之后缓存 */
        Object instance = cachedAdaptiveInstance.get();
     	instance = createAdaptiveExtension();
        return (T) instance;
    }
    
    /*xxx: 创建兼容的扩展点实例*/
    private T createAdaptiveExtension() {
    	return injectExtension((T) getAdaptiveExtensionClass().newInstance());   
    }
    
    /*xxx: 从工厂中,自动注入参数。 类似spring注入bean */
    private T injectExtension(T instance) {
        //略...
    }
    
    /*xxx: 获取兼容的扩展点类 */
    private Class<?> getAdaptiveExtensionClass() {
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }
    
    /*xxx: 生成扩展点类(就是某个扩展点工厂的默认实现)*/
    private Class<?> createAdaptiveExtensionClass() {
        /*xxx: 生成动态代理类,该动态代理类  会根据 传入的 默认实现名称,生成源码 */
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        ClassLoader classLoader = findClassLoader();
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }
    
    /*xxx: 获取扩展点类集合*/
    private Map<String, Class<?>> getExtensionClasses() {
        Map<String, Class<?>> classes = cachedClasses.get();
        /*xxx: 先检查缓存中,是否有配置信息,没有则会从  META-INF/services/, META-INF/dubbo/,META-INF/dubbo/internal这几个路径下加载*/
        /*xxx: 2.7.7版本,将该功能抽成了 策略模式 */
        /*xxx: 加载扩展点类 集合*/
        classes = loadExtensionClasses();
        cachedClasses.set(classes);
        
        return classes;
    }
    
    /*xxx: 加载扩展点集合类 */
    private Map<String, Class<?>> loadExtensionClasses() {
        /*xxx: 检查是否有  @SPI注解,如果有,则获取注解中填写的名称,并缓存为默认实现名,如 @SPI("impl"),则会保存 impl 为默认实现*/
        cacheDefaultExtensionName();

        Map<String, Class<?>> extensionClasses = new HashMap<>();

        /*xxx: 该项目捐献给 apache后,对alibaba的做了兼容 */
        /*xxx: 策略提供路径,type提供名称*/
        for (LoadingStrategy strategy : strategies) {
            loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
            loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
        }

        return extensionClasses;
    }
    
    /*xxx: 检查是否配置了 默认扩展点实现*/
    private void cacheDefaultExtensionName() {
        /*xxx: 检查是否有  @SPI注解,如果有,则获取注解中填写的名称,并缓存为默认实现名,如 @SPI("impl"),则会保存 impl 为默认实现*/
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if (defaultAnnotation == null) {
            return;
        }
        String value = defaultAnnotation.value();
        cachedDefaultName =value;
    }
    
    /*xxx: 加载类,会自动处理  @Adaptive注解,  */
    private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
                           boolean overridden) throws NoSuchMethodException {
        
        /*xxx: 如果是自适应类(Adaptive) 则缓存, 缓存的自适应类只能有一个*/
        if (clazz.isAnnotationPresent(Adaptive.class)) {
            cacheAdaptiveClass(clazz, overridden);
            /*xxx: 如果是包装扩展类(Wrapper), 则直接加入包装扩展类的 Set集合*/
        } else if (isWrapperClass(clazz)) {
            cacheWrapperClass(clazz);
        } else {
            /*xxx: 自动激活注解 (Activate), 则缓存到自动激活的缓存中*/
            clazz.getConstructor();
         	cacheActivateClass(clazz, names[0]);   
        }
    }
    
     private void cacheName(Class<?> clazz, String name) {
        if (!cachedNames.containsKey(clazz)) {
            cachedNames.put(clazz, name);
        }
    }
    
    /*xxx: 缓存自适应类 */
    private void cacheAdaptiveClass(Class<?> clazz, boolean overridden) {
        /*xxx: 如果发现有多个自适应类,则抛出异常*/
        if (cachedAdaptiveClass == null || overridden) {
            cachedAdaptiveClass = clazz;
        } else if (!cachedAdaptiveClass.equals(clazz)) {
            throw new IllegalStateException("More than 1 adaptive class found: "
                    + cachedAdaptiveClass.getName()
                    + ", " + clazz.getName());
        }
    }
    
    /*xxx: 获取自动激活的扩展类*/
    public List<T> getActivateExtension(URL url, String key) {
        return getActivateExtension(url, key, null);
    }
    
    public List<T> getActivateExtension(URL url, String key, String group) {
        String value = url.getParameter(key);
        return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);
    }
    
    
    public List<T> getActivateExtension(URL url, String[] values, String group) {
     	/*xxx: 遍历 @Activate注解集合,根据传入的匹配条件,得到所偶符合激活条件的扩展类实现*/
            for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
                String name = entry.getKey();
                Object activate = entry.getValue();

                String[] activateGroup, activateValue;

                if (activate instanceof Activate) {
                    activateGroup = ((Activate) activate).group();
                    activateValue = ((Activate) activate).value();
                }
                
                if (isMatchGroup(group, activateGroup)
                        && !names.contains(name)
                        && !names.contains(REMOVE_VALUE_PREFIX + name)
                        && isActive(activateValue, url)) {
                    activateExtensions.add(getExtension(name));
                }
            }
                
           /*xxx: 返回所有自动激活类集合*/
        	return activateExtensions;
    }
}
  • 扩展点加载器-获取实现类的三种方式
//todo
  • 扩展点注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
/*xxx: 声明 dubbo扩展点*/
/*xxx: Service Provider Interface, 是 JAVA提供的一套 用来被第三方实现 或者 扩展的 接口。
     它可以用来启用框架扩展和替换组件。SPI的作用, 就是为这些被扩展的API寻找服务实现 */
/*xxx: SPI 是调用方 指定的 接口规范, 提供给外部用来实现,主要是框架扩展人员使用。*/
public @interface SPI {
    String value() default "";
}

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
/*xxx: 作为 spi 的一个附加实现,通过此注解,可以实现 接口实现的优先级关系 */
/*xxx: 加在类上,则某个类为 spi的默认实现, 加在方法上,则方法支持根据参数选择实现类*/
public @interface Adaptive {
    String[] value() default {};
}

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
/*xxx: 该注解 用于 自动激活某些与给定规则匹配的 扩展接口实现*/
public @interface Activate {
    /*xxx: 激活与当前分组相匹配的扩展接口实现 */
    String[] group() default {};
    
    /*xxx: 激活 与给定的 配置相匹配的 扩展*/
    String[] value() default {};
}

# 工厂机制(容器,实现自动注入)

@SPI
/*xxx: 扩展点工厂*/
public interface ExtensionFactory {
    /*xxx: 获取扩展点*/
    <T> T getExtension(Class<T> type, String name);
}

@Adaptive
/*xxx: 作为一开始的默认工厂进行实现,默认是以 dubbo spi管理的容器*/
/*xxx: 工厂代理*/
public class AdaptiveExtensionFactory implements ExtensionFactory {
    
    /*xxx: 用来缓存所有工厂实现,包括 SpiExtensionFactory, SpringExtensionFactory*/
    private final List<ExtensionFactory> factories;
    
    public AdaptiveExtensionFactory() {
        /*xxx: 工厂列表也是通过 SPI 实现的,因此可以在这里获取所有工厂的扩展点加载器*/
        ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
        List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
        /*xxx: 遍历所有的工厂名称,获取对应的工厂,并保存到 factories 列表中*/
        for (String name : loader.getSupportedExtensions()) {
            list.add(loader.getExtension(name));
        }
        /*xxx: 会通过TreeSet进行排序,SPI排在前面,Spring排在后面*/
        /*xxx: 当调用 getExtension方法时,会遍历工厂,先从 SPI容器中获取扩展类,如果没找到,再从Spring容器中查找*/
        factories = Collections.unmodifiableList(list);
    }

    @Override
    public <T> T getExtension(Class<T> type, String name) {
        /*xxx: 遍历所有的工厂进行查找,顺序是 SPI -> Spring*/
        for (ExtensionFactory factory : factories) {
            T extension = factory.getExtension(type, name);
            if (extension != null) {
                return extension;
            }
        }
        return null;
    }
}
  • Spring容器

从spring上下文中获取依赖;

  • SPI容器
/*xxx: 通过工厂类,加载 注解了 @SPI的实现类 */
public class SpiExtensionFactory implements ExtensionFactory {

    @Override
    public <T> T getExtension(Class<T> type, String name) {
        /*xxx: 获取扩展点接口对应的Adaptive实现类*/
        if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
            /*xxx: 根据类型,获取所有的扩展点加载器*/
            ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
            /*xxx: 如果缓存的扩展点类不为空,则直接返回 Adaptive实例*/
            /*xxx: 实际返回的是某一个接口的注解了 @Adaptive的默认实现类,或者注解了 @Adaptive方法的动态代理类,可实现根据参数,决定适配类*/
            if (!loader.getSupportedExtensions().isEmpty()) {
                return loader.getAdaptiveExtension();
            }
        }
        return null;
    }

}

# URL实例抽象

/*xxx: 统一资源定位符,一般情况下,由多个配置组成,包括: 协议类型,主机,端口,路径等*/
public 
class URL implements Serializable {
    /*xxx: 协议类型*/
    private final String protocol;

    /*xxx:账户名*/
    private final String username;

    /*xxx:密码*/
    private final String password;

    // by default, host to registry
    /*xxx:默认主机*/
    private final String host;

    // by default, port to registry
    /*xxx:默认端口*/
    private final int port;

    /*xxx:路径*/
    private final String path;

    /*xxx:参数*/
    private final Map<String, String> parameters;

    /*xxx:方法参数*/
    private final Map<String, Map<String, String>> methodParameters;
    
}

# RPC机制的实现-dubbo协议

  • 协议层
@SPI("dubbo")
/*xxx: 协议抽象*/
public interface Protocol {
    /*xxx: 默认协议端口 */
    int getDefaultPort();
    
    @Adaptive
    /*xxx: 暴露服务给远程调用 */
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    
    @Adaptive
    /*xxx: 获取远程服务代理*/
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    
    /*xxx: 获取所有提供该协议的服务器: 由地址字符串,以及服务描述URL组成 */
    default List<ProtocolServer> getServers() {
        return Collections.emptyList();
    }
}

/*xxx: 抽象协议*/
public abstract class AbstractProtocol implements Protocol {
    /*xxx: 协议暴露服务的缓存 */
    protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
    
    /*xxx: 协议服务器缓存 */
    protected final Map<String, ProtocolServer> serverMap = new ConcurrentHashMap<>();
    
    /*xxx: 构造服务的key */
    protected static String serviceKey(URL url) {
        /*xxx: 获取绑定的端口*/
        int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
        return serviceKey(port, url.getPath(), url.getParameter(VERSION_KEY), url.getParameter(GROUP_KEY));
    }

    /*xxx: 根据 服务的端口,服务名称,服务版本,服务分组构建 服务键名 */
    protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
        return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
    }
    
    @Override
    /*xxx: 引用远程服务*/
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        /*xxx: 使用异步转同步的 装饰执行器 */
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }
    
    /*xxx: 通过协议引用远程服务*/
    protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
}

/*xxx: dubbo协议,实现远程服务调用 */
public class DubboProtocol extends AbstractProtocol {
    /*xxx: 默认的dubbo远程服务暴露端口 */
    public static final int DEFAULT_PORT = 20880;
    
    /*xxx: dubbo协议实体,单例模式实现 */
    private static DubboProtocol INSTANCE;
    
    @Override
    /*xxx: 暴露服务*/
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        /*xxx: 首先从执行器中,获取 URL信息 */
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        /*xxx: dubbo协议 的exporter也会在本地维护一份缓存 */
        exporterMap.put(key, exporter);
        /*xxx: 省略部分抽象...*/
        
        /*xxx:通过信息交换,以及网络传输层,将服务暴露 */
        openServer(url);
        
        /*xxx: 省略部分抽象...*/
        return exporter;
    }
    
    /*xxx: 创建协议服务器 */
    private void openServer(URL url) {
        /*xxx: 获取服务器地址 */
        /*xxx: 某个机器已经启动了netty服务,则不会再启动*/
        String key = url.getAddress();
        /*xxx: 调用此代码,默认作为服务端提供外部服务,也可以显式设置值,禁止此特性 */
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        
        if (isServer) {
            /*xxx: 以当前的key,获取服务端实例 */
            ProtocolServer server = serverMap.get(key);
            if (server == null) {
                /*xxx: 如果没有获取到,则创建*/
            	/*xxx: 根据url信息,创建服务 */
            	serverMap.put(key, createServer(url));
            }else{
                //xxx: 如当前地址,已开启服务,则调用reset方法
                //该方法主要作用应该是记录,往注册中心记录,暂未跟代码了
                server.reset(url);
            }
            
        }
    }
    
    /*xxx: 创建url服务*/
    private ProtocolServer createServer(URL url) {
        url = URLBuilder.from(url)
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                /*xxx: 默认情况下允许心跳机制,1分钟 */
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
        
        /*xxx: 信息交换服务器,绑定url */
        ExchangeServer server = Exchangers.bind(url, requestHandler);
        
        return new DubboProtocolServer(server);
    }
    
    /*xxx: 请求处理适配器*/
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        @Override
        /*xxx: 答复消息*/
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
            /*xxx: 消息类型不是 Invocation的话,则 抛错*/
            if (!(message instanceof Invocation)) {
                //抛错...
            }
            
            Invocation inv = (Invocation) message;
            /*xxx: 从 Invocation中,获取invoker */
            Invoker<?> invoker = getInvoker(channel, inv);
            
            /*xxx: 检查调用合法性(方法是否存在),略*/
            
            /*xxx: 通过invoker 执行,并获取结果 */
            Result result = invoker.invoke(inv);
            return result.thenApply(Function.identity());
            
        }
        
        @Override
        /*xxx: 接收到消息*/
        public void received(Channel channel, Object message) throws RemotingException {
            /*xxx: 接收到信息时,进行分派*/
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }
        
        /*xxx: 根据url,创建本地Invocation */
        private Invocation createInvocation(Channel channel, URL url, String methodKey) {
            /*xxx: 根据方法的键,获取对应的方法*/
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }

            /*xxx: 设置远程调用的相关参数 */
            RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), new Class<?>[0], new Object[0]);
            invocation.setAttachment(PATH_KEY, url.getPath());
            invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
            invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
            invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY));
            if (url.getParameter(STUB_EVENT_KEY, false)) {
                invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
            }

            return invocation;
        }
    };
}
  • 交换层
/*xxx: 服务交换器外观模式 */
public class Exchangers {
    
    /*xxx: 服务交换层绑定地址,返回服务交换实例 */
    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }
    
    /*xxx: 获取交换服务实例*/
    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }
    
    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }
    
}

@SPI(HeaderExchanger.NAME)
/*xxx: 信息交换器的抽象工厂*/
public interface Exchanger {
 	@Adaptive({Constants.EXCHANGER_KEY})
    /*xxx: 将url与交换处理器绑定 */
    /*xxx: 服务端使用 */
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
    
    @Adaptive({Constants.EXCHANGER_KEY})
    /*xxx: 连接url,并提供 交换处理器 */
    /*xxx: 客户端使用*/
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}

/*xxx: 默认的消息发送器: header*/
/*xxx: dubbo的客户端与服务端实现,都基于此*/
public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    /*xxx: 重型实例*/
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        /*xxx: 默认的连接实现,调用 网络传输层进行处理 */
        /*xxx: 信息交换处理器,由 解码处理器装饰实现*/
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    /*xxx: 重型实例*/
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        /*xxx: 默认的绑定实现, 调用 网络传输层进行实现 */
        /*xxx: 信息交换处理器,由 解码处理器装饰实现*/
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}
  • 传输层
/*xxx: 网络传输层工厂*/
public class Transporters {
    
    /*xxx: 将 url 与 信道处理器进行绑定 */
    public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            /*xxx: 如果有多个信道处理器,则需要处理器进行分发 */
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }
}

@SPI("netty")
/*xxx: 网络传输层 */
public interface Transporter {
    
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    /*xxx: 服务端调用*/
    RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;
    
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    /*xxx: 客户端调用 */
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
    
}

/*xxx: 默认的 网络传输层实现,netty4 */
public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    /*xxx: 服务端 */
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyServer(url, handler);
    }

    @Override
    /*xxx: 客户端*/
    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyClient(url, handler);
    }

}
  • 网络实现
/*xxx: netty服务端*/
public class NettyServer extends AbstractServer implements RemotingServer {
    
    /*xxx: 每次新增暴露服务的时候,都会新生成该实例*/
 	public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
     	super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));   
    }
    
    @Override
    protected void doOpen() throws Throwable {
        /*xxx: 省略netty的使用方式...*/
     	ChannelFuture channelFuture = bootstrap.bind(getBindAddress());   
    }
    
}

/*xxx: 抽象网络服务端 */
public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer {
 	//本地端口
    private InetSocketAddress localAddress;   
    
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
     	/*xxx: 获取绑定的ip*/
        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        /*xxx: 获取绑定的端口 */
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        
        /*xxx: 打开服务*/
        doOpen();
    }
    
    /*xxx: 开启服务端服务*/
    protected abstract void doOpen() throws Throwable;
    
    /*xxx: 获取待绑定接口*/
    public InetSocketAddress getBindAddress() {
        return bindAddress;
    }
}

一句话总结:暴露网络端口,客户端传来path,服务端通过path在本地缓存找exporter,缓存位于exporterMap中,且必须有。

# 泛化调用

  • 案例
package com.automannn.dubbo.bean;

import java.io.Serializable;

/**
 * @author automannn
 * @Date 2022/4/20
 */
public class MyDTO implements Serializable {
    private String id;

    private String name;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "MyDTO{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                '}';
    }
}
package com.automannn.dubbo.api;

import com.automannn.dubbo.bean.MyDTO;

/**
 * @author chenkh
 * @time 2021/7/28
 */
public interface HelloService {
    void sayHello(String name);

    void sayHi(String name);

    String invokeBean(MyDTO myDTO);
}
package com.automannn.dubbo.genericRpc;

import com.automannn.dubbo.api.HelloService;
import com.automannn.dubbo.provider.HelloServiceImpl;
import com.automannn.dubbo.withoutSpring.util.DubboUtils;

import java.util.Scanner;

/**
 * @author automannn
 * @Date 2022/4/20
 */
public class ApiServiceProvider {
    public static void main(String[] args) {
        DubboUtils.registerService(HelloService.class,new HelloServiceImpl(),"dubbo-generic-automan-practice-provider","zookeeper://localhost:2181","practice");
        Scanner scanner = new Scanner(System.in);
        scanner.nextLine();
    }
}
package com.automannn.dubbo.genericRpc;

import com.automannn.dubbo.api.HelloService;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.rpc.service.GenericService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
 * @author automannn
 * @Date 2022/4/20
 * @Description 泛化调用
 */
public class ApiGenericConsumer {
    public static void main(String[] args) {
        ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
        referenceConfig.setApplication(new ApplicationConfig("my-generic-consumer"));

        RegistryConfig registry1 = new RegistryConfig();
        registry1.setAddress("zookeeper://localhost:2181");
        registry1.setGroup("practice");
        List<RegistryConfig> registries = new ArrayList<>();
        registries.add(registry1);

        referenceConfig.setRegistries(registries);

        //xxx: 泛化调用,消费端可以不依赖api接口
        referenceConfig.setInterface("com.automannn.dubbo.api.HelloService");
        referenceConfig.setVersion("1.0");
        referenceConfig.setGroup("practice");
        //xxx: 设置为泛化调用
        referenceConfig.setGeneric("true");

        GenericService genericService = referenceConfig.get();
        genericService.$invoke("sayHello",new String[]{"java.lang.String"},new Object[]{"generic Call"});

        HashMap<String,Object> param = new HashMap<>();
        param.put("id","1");
        param.put("name","automannn");
        String result = (String) genericService.$invoke("invokeBean",new String[]{"com.automannn.dubbo.bean.MyDTO"},new Object[]{param});
        System.out.println(result);
    }
}
  • 有无API接口依赖

有不有接口依赖,比较大的区别是,可以通过动态代理,将底层的转换屏蔽掉,给用户原生的使用体验,但其底层的转换,依然是逃不掉的。从这个意义上讲,有没有接口依赖,并无大的大别

  • 设置泛化调用参数的影响

设置泛化调用后,生成的动态代理,既实现了原本的接口,也实现了泛化接口GenericService,形成的多态调用。换言之,泛化调用是兼容不泛化时的情况的

# 注册中心机制

  • 顶层设计
/*xxx: 注册中心服务 */
public interface RegistryService {
 	/*xxx: 注册服务*/
    void register(URL url);
    
    /*xxx: 注销服务*/
    void unregister(URL url);
    
    /*xxx: 订阅服务*/
    void subscribe(URL url, NotifyListener listener);
    
    /*xxx: 取消订阅*/
    void unsubscribe(URL url, NotifyListener listener);
    
    /*xxx: 查找服务*/
    List<URL> lookup(URL url);
}

/*xxx: API 或者 SPI 节点特性*/
public interface Node {
 	/*xxx: 是否可达 */
    boolean isAvailable();   
}

/*xxx: 注册中心接口 */
public interface Registry extends Node, RegistryService {
 	/*xxx: 注册服务*/
    default void reExportRegister(URL url) {
        register(url);
    }

    /*xxx: 注销服务*/
    default void reExportUnregister(URL url) {
        unregister(url);
    }   
}
  • 实现设计(zk)
/*xxx: zk实现的注册中心 */
public class ZookeeperRegistry extends FailbackRegistry {
    
    /*xxx: zk客户端,是zk注册中心的关键*/
    private final ZookeeperClient zkClient;
    
 	public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
     	/*xxx: 默认的zk命名空间:dubbo */
        this.root = group;
        /*xxx: zk客户端实例 */
        zkClient = zookeeperTransporter.connect(url);
        /*xxx: zk状态监测回调*/
        zkClient.addStateListener((state) -> {
        	//xxx: 略
        });
    }
    
    @Override
    /*xxx: 节点是否可达*/
    public boolean isAvailable() {
        return zkClient.isConnected();
    }
    
    @Override
    /*xxx: 服务注册*/
    public void doRegister(URL url) {
     	/*xxx: zk注册中心,就是创建一个临时节点*/
        /*xxx: 注意,这个临时节点的根节点,又称为命名空间,不指定的情况下,为dubbo,这特别重要*/
        /*xxx: 尤其是当 当前为消费者时,由于消费者可能开启了check,导致问题不好排查*/
        /*xxx: 另外,命名空间与 服务分组,在dubbo的api上面表现出来很相似,但它们是完全不同的两个概念,*/
        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));   
    }
    
    @Override
    /*xxx: 服务注销,就是删除临时节点*/
    public void doUnregister(URL url) {
     	zkClient.delete(toUrlPath(url));   
    }
    
    @Override
    /*xxx: 节点订阅,节点变化时,收到通知,并做出相应行为*/
    public void doSubscribe(final URL url, final NotifyListener listener) {
     	//略  
    }
    
    @Override
    /*xxx: 取消节点订阅*/
    public void doUnsubscribe(URL url, NotifyListener listener) {
     	//xxx: 略   
    }
    
    @Override
    /*xxx: 查找节点*/
    public List<URL> lookup(URL url) {
     	/*xxx: 获取提供者列表*/
        List<String> providers = new ArrayList<>();   
        /*xxx: 获取特定路径下的,所有子节点信息*/
        for (String path : toCategoriesPath(url)) {
                List<String> children = zkClient.getChildren(path);
                if (children != null) {
                    providers.addAll(children);
                }
        }
        return toUrlsWithoutEmpty(url, providers);
    }
    
    /*xxx: 获取路径参数,如果指定了通配为 *,则会把 提供者,消费者,路由,配置等路径全部包含*/
    /*xxx: 否则只包含 提供者的路径*/
    private String[] toCategoriesPath(URL url) {
        //xxx: 略...
    }
    
    /*xxx: 从所有的提供者里面,找到与消费者相匹配的提供者,并返回*/
    private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
     	/*xxx: 先判断协议是否相匹配*/
        if (provider.contains(PROTOCOL_SEPARATOR_ENCODED)) {
            /*xxx: 将提供者反序列化为url*/
            URL url = URLStrParser.parseEncodedStr(provider);
            /*xxx: 判断消费者的url与提供者url是否匹配*/
            if (UrlUtils.isMatch(consumer, url)) {
                urls.add(url);
             }
         }
        return urls;
    }
}
  • 实现设计(nacos)
/*xxx: nacos注册中心 */
public class NacosRegistry extends FailbackRegistry {
 	/*xxx: nacos客户端 */
    private final NamingService namingService;
    
    public NacosRegistry(URL url, NamingService namingService) {
        super(url);
        this.namingService = namingService;
    }
    
    @Override
    /*xxx: 节点可用性检查*/
    public boolean isAvailable() {
        return "UP".equals(namingService.getServerStatus());
    }
    
    @Override
    /*xxx: 查找服务*/
    public List<URL> lookup(final URL url) {
        final List<URL> urls = new LinkedList<>();
        execute(namingService -> {
            Set<String> serviceNames = getServiceNames(url, null);
            for (String serviceName : serviceNames) {
                /*xxx: 获取所有的服务实例*/
                List<Instance> instances = namingService.getAllInstances(serviceName,
                        getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP));
                urls.addAll(buildURLs(url, instances));
            }
        });
        return urls;
    }
    
    @Override
    /*xxx: nacos注册服务*/
    public void doRegister(URL url) {
        /*xxx: 首先获取url的服务名称 */
        final String serviceName = getServiceName(url);
        /*xxx: 根据 url创建客户端实例信息  */
        final Instance instance = createInstance(url);
     	/*xxx: 通过nacos的客户端api规范,注册服务,同样又命名空间的概念 */
        execute(namingService -> namingService.registerInstance(serviceName,
                getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP), instance));   
    }
    
     @Override
    /*xxx: 注销服务*/
    public void doUnregister(final URL url) {
        execute(namingService -> {
            String serviceName = getServiceName(url);
            Instance instance = createInstance(url);
            namingService.deregisterInstance(serviceName,
                    getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
                    instance.getIp()
                    , instance.getPort());
        });
    }
}
  • 应用设计-提供端
/*xxx: 注册中心协议 */
public class RegistryProtocol implements Protocol {
    @Override
    /*xxx: 注册到认证中心 */
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        /*xxx: 本地服务暴露,该方法执行后,实际上已经可以通过url的方式访问服务了 */
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        /*xxx: 注册到注册中心*/
        register(registryUrl, registeredProviderUrl);
    }

    /*xxx: 注册实现,见上方顶层设计*/
    private void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        protocol.export(invokerDelegate);
        //xxx: 省略其他抽象...
    }
}
  • 应用设计-消费端
/*xxx:注册中心路径抽象 */
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    /*xxx: 消费端通过订阅提供端节点信息,动态更新 invokers */
    private void refreshInvoker(List<URL> invokerUrls) {

        /*xxx: 直接根据url 转换为 Invoker*/
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);

        /*xxx: routerChain中的invoker设置好后,后续便可以直接根据它调用*/
        routerChain.setInvokers(newInvokers);
    }

    /*xxx: 根据url构造invoker */
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        /*xxx: 遍历提供者地址*/
        for (URL providerUrl : urls) {
            /*xxx: 根据url,构建invoker*/
            invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
        }
    }
}

# 完整调用机制

# 服务注册流程

/*xxx: 服务提供者配置,单独服务暴露时,可以以此为起点进行查看  */
/*xxx: 它 具有基类配置的特性 */
public class ServiceConfig<T> extends ServiceConfigBase<T> {
    
    public synchronized void export() {
        /*xxx: service自带初始上下文特性 */
        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.initialize();
        }

        /*xxx: 实际的服务暴露方法*/
        doExport();
    }

    protected synchronized void doExport() {
        doExportUrls();
    }

    /*xxx: 本地服务托管,以及特定的服务协议路径映射 */
    private void doExportUrls() {
        /*xxx; 获取 服务容器*/
        /*xxx: 远程调用的本质: 路径->接口->实现类*/
        ServiceRepository repository = ApplicationModel.getServiceRepository();

        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        /*xxx: 绑定接口跟实现关系 */
        repository.registerProvider(
                getUniqueServiceName(),
                ref,
                //xxx: serviceProvider中,具有接口信息
                serviceDescriptor,
                this,
                serviceMetadata
        );

        /*xxx: 从当前的服务配置中,获取 注册中心地址信息*/
        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

        /*xxx: 遍历服务暴露协议,并向指定注册中心注册,同时,将暴露的服务路径,与接口映射起来*/
        for (ProtocolConfig protocolConfig : protocols) {
            /*xxx: 构造 键名 */
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);

            /*xxx: 将协议路径,与接口映射起来*/
            repository.registerService(pathKey, interfaceClass);
            
            //xxx: 注册中心注册节点,本质上就是注册一串文本信息,从这个意义上讲,注册中心不那么玄妙
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

    /*xxx: 为某种协议,注册节点 */
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        /*xxx: 获取协议名称,没有指定的情况下,使用dubbo */
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            name = DUBBO;
        }

        Map<String, String> map = new HashMap<String, String>();
        /*xxx: 设置应用的类型,为提供方还是消费方 */
        map.put(SIDE_KEY, PROVIDER_SIDE);

        /*xxx: 处理方法集合,将所有的方法以及参数类型,序列化为文本,并作为 属性,放入map中*/
        if (CollectionUtils.isNotEmpty(getMethods())) {
            //xxx: 略...
        }

        /*xxx: 是否是泛化服务,如果是泛化服务,则methods的值为*,否则为所有的方法名字符串 */
        if (ProtocolUtils.isGeneric(generic)) {
            map.put(GENERIC_KEY, generic);
            map.put(METHODS_KEY, ANY_VALUE);
        } else {
            map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }

        /*xxx: 获取本机暴露服务的ip 及 端口 信息*/
        String host = findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = findConfigedPorts(protocolConfig, name, map);
        /*xxx: 根据本机信息,构建 url*/
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

        /*xxx: 生成Invoker,该 invoker 是一个实现了接口的动态代理类 */
        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

        /*xxx: 生成exporter,在此时,服务已经对外进行了暴露*/
        /*xxx: 此时的URL为: registry://localhost:2181,换言之,协议为registry.如果此时没有注册中心,则时dubbo协议*/
        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
        /*xxx: 并将暴露的服务信息缓存起来 */
        exporters.add(exporter);
        
    }
}

服务暴露,最核心的方法就在于Exporter<?> exporter = PROTOCOL.export(wrapperInvoker)

# 默认协议的实现
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
http=org.apache.dubbo.rpc.protocol.http.HttpProtocol
registry=org.apache.dubbo.registry.integration.RegistryProtocol
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper

实际的暴露链为: qos->filter->listener->registry(注册中心协议内部调用dubbo) ---> qos->filter->listener->dubbo 具体协议的包装过程,由ExtentionLoader实现,略

# 服务调用流程

/*xxx: 消费者接口层配置*/
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
    
    /*xxx: 获取消费者代理*/
    public synchronized T get() {
        init();
        return ref;
    }

    public synchronized void init() {
        /*xxx: 初始化dubboBootstrap */
        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }

        /*xxx: 设置服务的类型,为消费者端*/
        map.put(SIDE_KEY, CONSUMER_SIDE);

        /*xxx: 获取dubbo服务的ip地址*/
        String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
        /*xxx: 如果没有在系统层面进行配置,则获取本机的地址*/
        if (StringUtils.isEmpty(hostToRegistry)) {
            hostToRegistry = NetUtils.getLocalHost();
        }

        /*xxx: 根据接口信息,创建服务代理 */
        ref = createProxy(map);
    }

    /*xxx: 根据服务元数据,创建代理 */
    private T createProxy(Map<String, String> map) {
        /*xxx: 根据实际的协议进行引用 */
        invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
    }
}

本质上,通过注册中心协议实现负载均衡,通过其他应用协议如dubbo,grpc实现远程调用

/*xxx: 注册中心协议 */
public class RegistryProtocol implements Protocol {
    
    @Override
    /*xxx: 注册中心引用 */
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        Registry registry = registryFactory.getRegistry(url);

        /*xxx: 获取提供者集群,没指定的情况下,使用 MockCluster*/
        Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
        return doRefer(cluster, registry, type, url);
    }

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);

        /*xxx: 消费者是否需要注册 */
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            /*xxx: 将消费者进行注册*/
            registry.register(directory.getRegisteredConsumerUrl());
        }

        /*xxx: 订阅生产者节点 */
        directory.subscribe(toSubscribeUrl(subscribeUrl));

        /*xxx: 获取代理 */
        Invoker<T> invoker = cluster.join(directory);
    }
}
/*xxx: 模拟集群实现*/
public class MockClusterWrapper implements Cluster {
    
    private Cluster cluster;
    
    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }
}

public class FailoverCluster extends AbstractCluster {

    public final static String NAME = "failover";

    @Override
    public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<>(directory);
    }

}

public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {
    
    @Override
    public Result invoke(final Invocation invocation) throws RpcException {
        /*xxx: 从注册中心,获取提供者列表*/
        List<Invoker<T>> invokers = list(invocation);

        /*xxx: 构造负载均衡器*/
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);

        return doInvoke(invocation, invokers, loadbalance);
    }

    protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                       LoadBalance loadbalance) throws RpcException;

    /*xxx: 列出提供者列表*/
    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        return directory.list(invocation);
    }
}

/*xxx: 默认负载均衡策略: 错误重试*/
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
    @Override
    /*xxx: 根据负载均衡策略,以及重试次数进行调用 */
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        //xxx: 略...
    }
}
/*xxx: 目录接口 */
public interface Directory<T> extends Node {
    /*xxx: 列出 invocation的invokers列表 */
    List<Invoker<T>> list(Invocation invocation) throws RpcException;
}

/*xxx: 抽象路径*/
public abstract class AbstractDirectory<T> implements Directory<T> {
    @Override
    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        return doList(invocation);
    }

    protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
}

/*xxx:注册中心路径抽象 */
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    @Override
    public List<Invoker<T>> doList(Invocation invocation) {
        
        List<Invoker<T>> invokers = routerChain.route(getConsumerUrl(), invocation);
        
        return invokers;
    }

    /*xxx: 消费端通过订阅提供端节点信息,动态更新 invokers */
    private void refreshInvoker(List<URL> invokerUrls) {

        /*xxx: 直接根据url 转换为 Invoker*/
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);

        /*xxx: routerChain中的invoker设置好后,后续便可以直接根据它调用*/
        routerChain.setInvokers(newInvokers);
    }

    /*xxx: 根据url构造invoker */
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        /*xxx: 遍历提供者地址*/
        for (URL providerUrl : urls) {
            /*xxx: 根据url,构建invoker*/
            invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
        }
    }
}
/*xxx: 抽象协议*/
public abstract class AbstractProtocol implements Protocol {
    
    @Override
    /*xxx: 引用远程服务*/
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        /*xxx: 使用异步转同步的 装饰执行器 */
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }

    /*xxx: 通过协议引用远程服务*/
    protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
    
}

/*xxx: dubbo协议,实现远程服务调用 */
public class DubboProtocol extends AbstractProtocol {
    @Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        /*xxx: 创建一个 dubboInvoker */
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    }
}

/*xxx: dubbo执行器  */
public class DubboInvoker<T> extends AbstractInvoker<T> {
    @Override
    /*xxx: dubbo执行器,实际执行的方法,默认情况下,是一个异步的请求,通过future完成*/
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        ExchangeClient currentClient;
        //xxx: 发送远程请求
        CompletableFuture<AppResponse> appResponseFuture =
                currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
        //xxx: 省略其他抽象...
    }
}

# 服务容器

/*xxx: 服务容器,仅用于本地便捷获取使用,远程调用时不涉及 */
public class ServiceRepository extends LifecycleAdapter implements FrameworkExt {
    /*xxx: 存储服务信息*/
    private ConcurrentMap<String, ServiceDescriptor>  services = new ConcurrentHashMap<>();

    /*xxx: 存储消费者信息 */
    private ConcurrentMap<String, ConsumerModel> consumers = new ConcurrentHashMap<>();

    /*xxx; 存储提供者信息*/
    private ConcurrentMap<String, ProviderModel> providers = new ConcurrentHashMap<>();
    
    //xxx: 略...
}

# spring集成原理

/*xxx: dubbo与spring提供了原生集成*/
public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {
    @Override
    public void init() {
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
    }
    
    //xxx: 省略其他抽象...
}

/*xxx: service工厂*/
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
        ApplicationContextAware, BeanNameAware, ApplicationEventPublisherAware {
    
    //xxx:省略抽象...
}

/*xxx: 抽象注解bean后置处理器: ServiceBean调用export,ReferenceBean调用refer */
public abstract class AbstractAnnotationBeanPostProcessor extends
        InstantiationAwareBeanPostProcessorAdapter implements MergedBeanDefinitionPostProcessor, PriorityOrdered,
        BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, DisposableBean {
    //xxx: 略...
}

# dubbo扩展点专题

  • ExtensionLoader,ExtensionFactory,@SPI,@Adaptive,@Activate都是dubbo框架的扩展点机制用于实现dubbo的可扩展性

# ExtensionLoader

  • 扩展点加载器,负责加载Dubbo的扩展点接口实现;
  • 动态代理的过程是由其实现的

# 功能

  • 加载扩展点接口的所有实现
  • 缓存扩展点接口的所有实现
  • 获取指定名称的扩展点实现
  • 获取自适应扩展点实现

# 实例

  • 配置文件META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
http=org.apache.dubbo.rpc.protocol.http.HttpProtocol
rmi=org.apache.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=org.apache.dubbo.rpc.protocol.hessian.HessianProtocol
org.apache.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=org.apache.dubbo.rpc.protocol.thrift.ThriftProtocol
native-thrift=org.apache.dubbo.rpc.protocol.nativethrift.ThriftProtocol
memcached=org.apache.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=org.apache.dubbo.rpc.protocol.redis.RedisProtocol
rest=org.apache.dubbo.rpc.protocol.rest.RestProtocol
xmlrpc=org.apache.dubbo.xml.rpc.protocol.xmlrpc.XmlRpcProtocol
grpc=org.apache.dubbo.rpc.protocol.grpc.GrpcProtocol
registry=org.apache.dubbo.registry.integration.RegistryProtocol
service-discovery-registry=org.apache.dubbo.registry.client.ServiceDiscoveryRegistryProtocol
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper
package com.automannn.dubbo.extensionLoader;

import org.apache.dubbo.cache.CacheFactory;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.rpc.Protocol;

import java.util.ArrayList;
import java.util.Set;

/**
 * @author automannn
 * @Date 2022/4/18
 */
public class ExtenSionLoaderTest {

    public static void main(String[] args) {
        ExtensionLoader loader = ExtensionLoader.getExtensionLoader(Protocol.class);
        Set<String> supportedExtensions = loader.getSupportedExtensions();
        ArrayList<String> extensionList = new ArrayList<>();

        //获取所有的扩展点实现
        for (String supportedExtension : supportedExtensions) {
            extensionList.add(supportedExtension);
        }
        System.out.println(extensionList.size());
        System.out.println(extensionList);

        //获取指定名称的实现类
        System.out.println(loader.getExtension("http"));

        //获取自适应实现类
        System.out.println(loader.getAdaptiveExtension());

    }
}
16
[dubbo, grpc, hessian, http, injvm, memcached, mock, native-thrift, redis, registry, rest, rmi, service-discovery-registry, thrift, webservice, xmlrpc]
org.apache.dubbo.qos.protocol.QosProtocolWrapper@1efed156
org.apache.dubbo.rpc.Protocol$Adaptive@9660f4e

# ExtensionFactory

  • dubbo扩展点的自适应工厂
  • 用于根据指定的名称和类型,获取对应的扩展点实例
  • 它是通过AdaptiveExtensionFactory实现的

# 功能

  • 根据类型,获取对应的扩展点实例;
  • 根据名称和类型,获取对应的扩展点实例;

# 实例

package com.automannn.dubbo.extensionFactoryTest;

import org.apache.dubbo.common.extension.ExtensionFactory;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.Protocol;

/**
 * @author automannn
 * @Date 2023/5/23
 */
public class ExtensionFactoryTest {
    public static void main(String[] args) {
        //对于扩展点工厂, 扩展点加载器会特殊处理
        ExtensionFactory extensionFactory =  ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension();
        //对于扩展点工厂, 扩展点加载器会特殊处理
        System.out.println(extensionFactory);

        //获取对象的时候,分别从 spi工厂,spring工厂获取,
        // 其中,spi工厂不论传入生命名称,都是获取的 接口的兼容扩展点实现
        System.out.println(extensionFactory.getExtension(Protocol.class,"dubbo"));
    }
}
org.apache.dubbo.common.extension.factory.AdaptiveExtensionFactory@35851384
org.apache.dubbo.rpc.Protocol$Adaptive@35d176f7

# @SPI

  • 扩展点机制的核心注解,用于标记Dubbo扩展点接口;
  • 用于指定Dubbo扩展点的默认实现类名称,如果用户没有指定其它实现,则将使用默认实现;

# 实例

  • 配置文件META-INF/services/com.automannn.dubbo.handleExtention.HandlerTest
my=com.automannn.dubbo.handleExtention.MyHandlerTest
other=com.automannn.dubbo.handleExtention.OtherHandler
filter=com.automannn.dubbo.handleExtention.FilterHandler
@SPI("my")
public interface HandlerTest {
    void handle(String msg);
}
public class MyHandlerTest implements HandlerTest{
    @Override
    public void handle(String msg) {
        System.out.println("myHandler");
    }
}
public class OtherHandler implements HandlerTest{
    @Override
    public void handle(String msg) {

    }
}
package com.automannn.dubbo.SpiTest;

import com.automannn.dubbo.handleExtention.HandlerTest;
import org.apache.dubbo.common.extension.ExtensionLoader;

/**
 * @author automannn
 * @Date 2023/5/23
 */
public class SpiTest {
    public static void main(String[] args) {
        HandlerTest handlerTest = ExtensionLoader.getExtensionLoader(HandlerTest.class).getExtension("other");

        HandlerTest defaultExtension = ExtensionLoader.getExtensionLoader(HandlerTest.class).getDefaultExtension();

        System.out.println(handlerTest);

        System.out.println(defaultExtension);


    }
}

com.automannn.dubbo.handleExtention.OtherHandler@2a18f23c
com.automannn.dubbo.handleExtention.MyHandlerTest@d7b1517

# @Adaptive

  • Dubbo扩展点机制中的自适应扩展点注解,用于标记Dubbo扩展点接口的自适应扩展点实现;
  • 根据方法的参数类型,或者注解,来决定调用哪个扩展点实现;

# 实例

  • 配置文件META-INF/services/com.automannn.dubbo.handleExtention.HandlerTest
my=com.automannn.dubbo.handleExtention.MyHandlerTest
other=com.automannn.dubbo.handleExtention.OtherHandler
filter=com.automannn.dubbo.handleExtention.FilterHandler
adaptive=com.automannn.dubbo.handleExtention.TheAdaptiveHandler
package com.automannn.dubbo.adaptiveTest;

import com.automannn.dubbo.handleExtention.HandlerTest;
import org.apache.dubbo.common.extension.ExtensionLoader;

/**
 * @author automannn
 * @Date 2023/5/23
 */
public class AdaptiveTest {

    public static void main(String[] args) {
        HandlerTest adaptiveHandler = ExtensionLoader.getExtensionLoader(HandlerTest.class).getAdaptiveExtension();

        System.out.println(adaptiveHandler);
    }
}
com.automannn.dubbo.handleExtention.TheAdaptiveHandler@d7b1517

# @Activate

  • Bubbo扩展点机制中的自动激活扩展点注解,用于标记Dubbo扩展点接口的自动激活扩展点实现;
  • 用于在加载扩展点实现时,自动激活符合条件的扩展点实现;

# 实例

  • 配置文件META-INF/services/com.automannn.dubbo.handleExtention.HandlerTest
my=com.automannn.dubbo.handleExtention.MyHandlerTest
other=com.automannn.dubbo.handleExtention.OtherHandler
filter=com.automannn.dubbo.handleExtention.FilterHandler
adaptive=com.automannn.dubbo.handleExtention.TheAdaptiveHandler
filterA=com.automannn.dubbo.handleExtention.FilterAHandler
filterB=com.automannn.dubbo.handleExtention.FilterBHandler
package com.automannn.dubbo.handleExtention;

import org.apache.dubbo.common.extension.Activate;

/**
 * @author automannn
 * @Date 2023/5/23
 */
@Activate(group = "test",value="auto:testb",order = 4)
public class FilterBHandler implements HandlerTest{
    @Override
    public void handle(String msg) {
        System.out.println("filterB");
    }
}
package com.automannn.dubbo.handleExtention;

import org.apache.dubbo.common.extension.Activate;

/**
 * @author automannn
 * @Date 2023/5/23
 */
@Activate(group = "test",order = 1)
public class FilterAHandler implements HandlerTest{
    @Override
    public void handle(String msg) {
        System.out.println("filterA");
    }
}
package com.automannn.dubbo.activateTest;

import com.automannn.dubbo.handleExtention.HandlerTest;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;

import java.util.List;

/**
 * @author automannn
 * @Date 2023/5/23
 */
public class ActivatedTest {
    public static void main(String[] args) {
        URL url = URL.valueOf("dubbo://127.0.0.1:25088?name=filterA&auto=testb");
        List<HandlerTest> adaptiveHandlerList = ExtensionLoader.getExtensionLoader(HandlerTest.class).getActivateExtension(url,"name","test");

        URL url1 = URL.valueOf("dubbo://127.0.0.1:25088?name=filterA&auto=testa");
        List<HandlerTest> adaptiveHandlerList1 = ExtensionLoader.getExtensionLoader(HandlerTest.class).getActivateExtension(url1,"name","test");

        System.out.println(adaptiveHandlerList);
        System.out.println(adaptiveHandlerList1);
    }
}
[com.automannn.dubbo.handleExtention.FilterBHandler@d7b1517, com.automannn.dubbo.handleExtention.FilterAHandler@16c0663d]
[com.automannn.dubbo.handleExtention.FilterAHandler@16c0663d]

# dubbo消费者专题

# 初始化的触发

# 注解处理环境

  • 基础设置
public abstract class InstantiationAwareBeanPostProcessorAdapter implements SmartInstantiationAwareBeanPostProcessor {
 	@Deprecated
	@Override
	public PropertyValues postProcessPropertyValues(
			PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException {

		return pvs;
	}   
}
public abstract class AbstractAnnotationBeanPostProcessor  extends
        InstantiationAwareBeanPostProcessorAdapter {
    
    
    
    @Override
    /*xxx: 后置处理 (核心方法)*/
    public PropertyValues postProcessPropertyValues(
            PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {
     	   InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs);
        	/*xxx: 对需要注入的元数据进行注入*/
            metadata.inject(bean, beanName, pvs);
    }
    
    /*xxx: 查找需要注解注入的元数据*/
    private InjectionMetadata findInjectionMetadata(String beanName, Class<?> clazz, PropertyValues pvs) {
     	/*xxx: 构建元数据*/
        return buildAnnotatedMetadata(clazz);   
    }
    
    private AnnotatedInjectionMetadata buildAnnotatedMetadata(final Class<?> beanClass) {
        /*xxx: 通过属性查找需要注入的属性*/
        Collection<AnnotatedFieldElement> fieldElements = findFieldAnnotationMetadata(beanClass);
        /*xxx: 通过方法查找需要注入的属性*/
        Collection<AnnotatedMethodElement> methodElements = findAnnotatedMethodMetadata(beanClass);
        return new AnnotatedInjectionMetadata(beanClass, fieldElements, methodElements);
    }
    
    /*xxx: 查找根据属性注入的方法*/
    private List<AnnotatedFieldElement> findFieldAnnotationMetadata(final Class<?> beanClass) {
        
        final List<AnnotatedFieldElement> elements = new LinkedList<AnnotatedFieldElement>();
        
     	/*xxx: 遍历需要处理的注解*/
        for (Class<? extends Annotation> annotationType : getAnnotationTypes()) {
        	      /*xxx: 获取注解中的属性*/
                 AnnotationAttributes attributes = getAnnotationAttributes(field, annotationType, getEnvironment(), true, true);   
            	/*xxx: 构造成 注解元素配置信息,并保存 */
                elements.add(new AnnotatedFieldElement(field, attributes));
        }
    }
}
/*xxx: dubbo的 @reference 注解后置处理器 */
public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements
        ApplicationContextAware {
 	/*xxx: 注解后置处理器处理的注解类型 */
    public ReferenceAnnotationBeanPostProcessor() {
        super(DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class);
    }   
}
  • 自动注入的处理(核心)
/*xxx: 为方法注入属性*/
private class AnnotatedMethodElement extends InjectionMetadata.InjectedElement {
     	@Override
        protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
         	Object injectedObject = getInjectedObject(attributes, bean, beanName, injectedType, this);  
            /*xxx: 方法的注入,通过反射进行*/
            method.invoke(bean, injectedObject);
        }
    
    /*xxx: 从指定的注解或者 bean中,获取 注入对象*/
    //xxx: 这个方法位于 AbstractAnnotationBeanPostProcessor 中
    protected Object getInjectedObject(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                       InjectionMetadata.InjectedElement injectedElement) throws Exception {
     	   return doGetInjectedBean(attributes, bean, beanName, injectedType, injectedElement);
    }
    
    /*xxx: 子类通过该方法获取注入的对象*/
    protected abstract Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                                InjectionMetadata.InjectedElement injectedElement) throws Exception;
}
/*xxx: dubbo的 @reference 注解后置处理器 */
public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements
        ApplicationContextAware {
    
 	@Override
    /*xxx: 通过该方法获取注入的对象*/
    protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                       InjectionMetadata.InjectedElement injectedElement) throws Exception {
     	/*xxx: 获取引用的bean的名称 */
        String referenceBeanName = getReferenceBeanName(attributes, injectedType);

        ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);

        /*xxx: 验证是否是本地的bean */
        boolean localServiceBean = isLocalServiceBean(referencedBeanName, referenceBean, attributes);

        /*xxx: referenceBean前置准备*/
        prepareReferenceBean(referencedBeanName, referenceBean, localServiceBean);

        /*xxx: 注册 referenceBean到上下文 */
        registerReferenceBean(referencedBeanName, referenceBean, attributes, localServiceBean, injectedType);

        /*xxx: 为 referenceBean注入缓存信息*/
        cacheInjectedReferenceBean(referenceBean, injectedElement);

        return referenceBean.get();   
    }
}

# bean环境

/*xxx: reference工厂*/
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean,
        ApplicationContextAware, InitializingBean, DisposableBean {
	@Override
    public Object getObject() {
        return get();
    }            
}
public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig {
 	/*xxx: 获取代理*/
    public abstract T get();

    public abstract void destroy();   
}

# 初始化流程

# 基础设施

public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig {
 	/*xxx: 获取代理*/
    public abstract T get();

    public abstract void destroy();
}
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
 	//xxx: 实际的接口代理对象
    private transient volatile T ref;
    
    public synchronized T get() {
     	/*xxx: referenceConfig也有ref的关键属性,作用类似于ServiceConfig中的ref,实际上是一个代理对象*/
        if (ref == null) {
            /*xxx: 初始化*/
            init();
        }
        return ref;   
    }
    
    public synchronized void init() {
     	/*xxx: 初始化dubboBootstrap */
        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }
        Map<String, String> map = new HashMap<String, String>();
        /*xxx: 设置服务的类型,为消费者端*/
        map.put(SIDE_KEY, CONSUMER_SIDE);
        /*xxx: 获取消费者接口的所有方法 */
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
        
        map.put("interface", interfaceName);
        
        /*xxx: 设置注册到注册中心的本地地址 */
        String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
        /*xxx: 如果没有在系统层面进行配置,则获取本机的地址*/
        if (StringUtils.isEmpty(hostToRegistry)) {
            hostToRegistry = NetUtils.getLocalHost();
        }
        map.put(REGISTER_IP_KEY, hostToRegistry);
        
        /*xxx: 根据接口信息,创建服务代理 */
        /*xxx: 核心*/
        ref = createProxy(map);
        
        /*xxx: 检测服务是否正常注册 */
        checkInvokerAvailable();
        
    }
}

# 创建代理

public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
    
    /*xxx: 代理工厂扩展点*/
    private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
 
    /*xxx: 根据服务元数据,创建代理 */
    private T createProxy(Map<String, String> map) {
     	/*xxx: 根据实际的协议进行引用 */
        invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));   
        
        /*xxx: 检测注册中心*/
        checkRegistry();
                    
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }
    
}

# 协议调用

/*xxx: 注册中心协议 */
public class RegistryProtocol implements Protocol {
    
    @Override
    /*xxx: 注册中心引用 */
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        Registry registry = registryFactory.getRegistry(url);

        /*xxx: 获取提供者集群,没指定的情况下,使用 MockCluster*/
        Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
        return doRefer(cluster, registry, type, url);
    }

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);

        /*xxx: 消费者是否需要注册 */
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            /*xxx: 将消费者进行注册*/
            registry.register(directory.getRegisteredConsumerUrl());
        }

        /*xxx: 订阅生产者节点 */
        directory.subscribe(toSubscribeUrl(subscribeUrl));

        /*xxx: 获取代理 */
        Invoker<T> invoker = cluster.join(directory);
    }
}

# 注册中心获取

  • 工厂配置,位于META-INF/dubbo/com.alibaba.dubbo.registry.RegistryFactory
spring-cloud=com.alibaba.cloud.dubbo.registry.SpringCloudRegistryFactory
@SPI("dubbo")
public interface RegistryFactory {
 	@Adaptive({"protocol"})
    Registry getRegistry(URL url);   
}
public abstract class AbstractRegistryFactory implements RegistryFactory {
 	
    @Override
    public Registry getRegistry(URL url) {
     	url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        
        registry = createRegistry(url);
    }
    
    protected abstract Registry createRegistry(URL url);
}
/*xxx: 基于springCloud的注册中心 */
public class SpringCloudRegistryFactory extends AbstractRegistryFactory {
 	private DiscoveryClient discoveryClient;
    
    @Override
	/*xxx: 创建注册中心*/
	public Registry createRegistry(URL url) {
		//xxx: 初始化
		init();
     	/*xxx: 使用dubboCloud作为注册中心实现 "*/
		registry = new DubboCloudRegistry(url, discoveryClient,
					dubboServiceMetadataRepository, dubboMetadataConfigServiceProxy,
					jsonUtils, applicationContext);   
    }
}

# 服务可达性检测

/*xxx: 消费者接口层配置*/
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
 	
    /*xxx: 创建服务消费者是否正常进行了订阅*/
    private void checkInvokerAvailable() throws IllegalStateException {
        if (shouldCheck() && !invoker.isAvailable()) {
            invoker.destroy();
            throw new IllegalStateException("Failed to check the status of the service "
                    + interfaceName
                    + ". No provider available for the service "
                    + (group == null ? "" : group + "/")
                    + interfaceName +
                    (version == null ? "" : ":" + version)
                    + " from the url "
                    + invoker.getUrl()
                    + " to the consumer "
                    + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
    }
}
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
 	@Override
    public boolean isAvailable() {
        if (isDestroyed()) {
            return false;
        }
        Map<String, Invoker<T>> localUrlInvokerMap = urlInvokerMap;
        if (localUrlInvokerMap != null && localUrlInvokerMap.size() > 0) {
            for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
                if (invoker.isAvailable()) {
                    return true;
                }
            }
        }
        return false;
    }   
}

# 消费者订阅

/*xxx: 注册中心协议 */
public class RegistryProtocol implements Protocol {
 	   
     @Override
    /*xxx: 注册中心引用 */
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
     	  
        return doRefer(cluster, registry, type, url);
    }
    
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
     	RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL subscribeUrl = new URL("consumer", parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        
        /*xxx: 订阅生产者节点 */
        directory.subscribe(toSubscribeUrl(subscribeUrl));
        
        
        //xxx: 注意,是先订阅,再获取的代理
        /*xxx: 获取代理 */
        Invoker<T> invoker = cluster.join(directory);
    }
}
/*xxx:注册中心路径抽象 */
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
 	
    public void subscribe(URL url) {
     	registry.subscribe(url, this);   
    }
    
    @Override
    public synchronized void notify(List<URL> urls) {
     	/*xxx: 获取提供者url信息*/
        List<URL> providerURLs = categoryUrls.getOrDefault("providers", Collections.emptyList());
        
        /*xxx: 根据提供者的url, 更新invoker信息*/
        refreshOverrideAndInvoker(providerURLs);
    }
    
    private void refreshOverrideAndInvoker(List<URL> urls) {
     	refreshInvoker(urls);   
    }
    
    /*xxx: 通过订阅节点信息,动态更新 invokers */
    private void refreshInvoker(List<URL> invokerUrls) {
     	/*xxx: 将url 转换为 Invoker*/
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
        
        this.urlInvokerMap = newUrlInvokerMap;
   
    }
}

# dubboCloud专题

# 自动配置项

# 元数据自动配置

@Import({ DubboServiceMetadataRepository.class, IntrospectiveDubboMetadataService.class,DubboMetadataServiceProxy.class })
/*xxx: dubbo元数据自动配置 */
public class DubboMetadataAutoConfiguration {

}

# 注册中心

/*xxx: springCloud-dubbo的 注册中心配置*/
public class DubboServiceRegistrationAutoConfiguration {
 	
    @Bean
	/*xxx: 如果自定义配置中,没有配置 spring-cloud注册中心,则要将该注册中心进行注册 */
	@Conditional({ MissingSpringCloudRegistryConfigPropertyCondition.class })
	/*xxx: 进行 springCloud 注册中心配置*/
	/*xxx: spirngCloud的核心操作 */
	public RegistryConfig defaultSpringCloudRegistryConfig() {
		return new RegistryConfig(ADDRESS, PROTOCOL);
	}
    
    @EventListener(ServiceInstancePreRegisteredEvent.class)
	/*xxx: 服务实例预注册 */
	public void onServiceInstancePreRegistered(ServiceInstancePreRegisteredEvent event) {
        
     	attachDubboMetadataServiceMetadata(registration);   
    }
    
    /*xxx: 添加dubbo服务元数据 */
	private void attachDubboMetadataServiceMetadata(Registration registration) {
     	if (registration == null) {
			return;
		}
		synchronized (registration) {
			/*xxx: 获取注册表元数据 */
			Map<String, String> metadata = registration.getMetadata();
			attachDubboMetadataServiceMetadata(metadata);
		}   
    }
    
    private void attachDubboMetadataServiceMetadata(Map<String, String> metadata) {
		/*xxx: 获取dubbo服务 元数据*/
		Map<String, String> serviceMetadata = dubboServiceMetadataRepository
				.getDubboMetadataServiceMetadata();
		if (!isEmpty(serviceMetadata)) {
			/*xxx: 将dubbo服务 元数据保存至注册表元数据中 */
			metadata.putAll(serviceMetadata);
		}
	}
    
    
}

# 泛型工厂配置

@EnableConfigurationProperties(DubboCloudProperties.class)
/*xxx: 服务自动注册*/
public class DubboServiceAutoConfiguration {
 	@Bean
	@ConditionalOnMissingBean
	/*xxx: 泛型服务工厂*/
	public DubboGenericServiceFactory dubboGenericServiceFactory() {
		return new DubboGenericServiceFactory();
	}
 
}

# 服务自动发现

/*xxx: 服务自动发现配置 */
public class DubboServiceDiscoveryAutoConfiguration {
 	  
    @EventListener(HeartbeatEvent.class)
	public void onHeartbeatEvent(HeartbeatEvent event) {
     	Stream<String> subscribedServices = dubboServiceMetadataRepository
				.initSubscribedServices();

		heartbeatEventChangedPredicate.ifAvailable(predicate -> {
			if (predicate.test(event)) {
				//xxx: 分发服务实例变更事件
				subscribedServices.forEach(serviceName -> {
					List<ServiceInstance> serviceInstances = getInstances(serviceName);
					dispatchServiceInstancesChangedEvent(serviceName, serviceInstances);
				});
			}
		});   
    }
}

# 注册中心配置原理

# dubboCloud注册中心工厂

  • 配置文件META-INF/dubbo/com.alibaba.dubbo.registry.RegistryFactory
spring-cloud=com.alibaba.cloud.dubbo.registry.SpringCloudRegistryFactory
/*xxx: 基于springCloud的注册中心 */
public class SpringCloudRegistryFactory extends AbstractRegistryFactory {
 	
    /*xxx: 注册中心初始化 */
	protected void init() {
		/*xxx: 服务发现客户端*/
		this.discoveryClient = applicationContext.getBean(DiscoveryClient.class);
		/*xxx: dubbo服务元数据仓库 */
		this.dubboServiceMetadataRepository = applicationContext
				.getBean(DubboServiceMetadataRepository.class);
        
    }
    
    @Override
	/*xxx: 创建注册中心*/
	public Registry createRegistry(URL url) {
     	
        //xxx: 初始化
		init();
        
        /*xxx: 使用dubboCloud作为注册中心实现 "*/
		registry = new DubboCloudRegistry(url, discoveryClient,
					dubboServiceMetadataRepository, dubboMetadataConfigServiceProxy,
					jsonUtils, applicationContext);
        
        return registry;
    }
}

# dubboCloud注册中心

/*xxx: dubboCloud注册中心*/
public class DubboCloudRegistry extends FailbackRegistry
        implements ApplicationListener<ServiceInstancesChangedEvent> {
    
 	/*xxx: 服务发现客户端 */
    private final DiscoveryClient discoveryClient;

    /*xxx: dubbo元数据仓库, 所有的操作,都是围绕其进行 */
    private final DubboServiceMetadataRepository repository;
    
    
    @Override
    /*xxx: 注册服务 */
    public final void doRegister(URL url) {
     	synchronized (this) {
            /*xxx: 初始化*/
            preInit();
            if (shouldNotRegister(url)) {
                return;
            }
            /*xxx: 生产者进行注册, 注册到元数据仓库中的(注册到本地)*/
            repository.exportURL(url);
        }   
    }
    
    private void preInit() {
         //xxx: 如果初始化时,已经存在订阅处理器实例,则进行初始化
         urlSubscribeHandlerMap.forEach((url, handler) -> handler.init());
         /*xxx: 初始化元数据*/
         repository.initializeMetadata();
        
        /*xxx: 监听实例变更事件 */
        applicationContext.addApplicationListener(this);
    }
    
   
    @Override
    //xxx: 消费者操作的端口,消费者通过 注册中心进行订阅
    public final void doSubscribe(URL url, NotifyListener listener) {
     	synchronized (this) {
            preInit();
         	 /*xxx: 如果是监控订阅*/
            if (isAdminURL(url)) {
             	//xxx: 略   
            }else if (isDubboMetadataServiceURL(url) && containsProviderCategory(url)) {
             	String appName = getServiceName(url);
                /*xxx: 如果是元数据订阅*/
                MetadataServiceSubscribeHandler handler = new MetadataServiceSubscribeHandler(
                        appName, url, listener, this, dubboMetadataUtils);
                if (inited.get()) {
                    handler.init();
                }
                metadataSubscribeHandlerMap.put(appName, handler);   
            }/*xxx: 如果是客户端订阅, 则所有的代理实现,由 GeneralServiceSubsribeHandler实现, 可以类比 dubbo的 泛型服务*/
            else if (isConsumerServiceURL(url)) {
             	GenearalServiceSubscribeHandler handler = new GenearalServiceSubscribeHandler(
                        url, listener, this, repository, jsonUtils,
                        dubboMetadataConfigServiceProxy);
                if (inited.get()) {
                    /*xxx: 进行初始化*/
                    handler.init();
                }
                /*xxx: 将路径 与 订阅处理器实例进行绑定*/
                urlSubscribeHandlerMap.put(url, handler);   
            }
            
            
        }
    }
}

# 元数据仓库与元数据服务

# 元数据仓库

@Repository
/*xxx: 元数据仓库 */
public class DubboServiceMetadataRepository
		implements SmartInitializingSingleton, ApplicationEventPublisherAware {
 	
    /*xxx: 初始化元数据*/
	public void initializeMetadata() {
     	doGetSubscribedServices().forEach(this::initializeMetadata);   
    }
    
    public void initializeMetadata(String serviceName) {
		/*xxx: 初始化dubbo的rest服务元数据*/
		initDubboRestServiceMetadataRepository(serviceName);
	}
    
    protected void initDubboRestServiceMetadataRepository(String serviceName) {
     	/*xxx: 获取服务实例的元数据信息 */
		Set<ServiceRestMetadata> serviceRestMetadataSet = getServiceRestMetadataSet(
				serviceName);
        
        for (ServiceRestMetadata serviceRestMetadata : serviceRestMetadataSet) {

			/*xxx: 将dubboServiceMetadata的服务元数据 写入 微服务实例的元数据中*/
			serviceRestMetadata.getMeta().forEach(restMethodMetadata -> {
				RequestMetadata requestMetadata = restMethodMetadata.getRequest();
				RequestMetadataMatcher matcher = new RequestMetadataMatcher(
						requestMetadata);
				DubboRestServiceMetadata metadata = new DubboRestServiceMetadata(
						serviceRestMetadata, restMethodMetadata);
				/*xxx: 保存服务元数据信息 */
				metadataMap.put(matcher, metadata);
			});
		}
    }
    
    
    /*xxx: 注册服务到注册中心(暴露服务)*/
	public void exportURL(URL url) {
        URL actualURL = url;
        
     	/*xxx: 将ip 进行拆分 */
		InetUtils.HostInfo hostInfo = inetUtils.findFirstNonLoopbackHostInfo();
        
        String ipAddress = hostInfo.getIpAddress();
        
        if (!Objects.equals(url.getHost(), ipAddress)) {
			actualURL = url.setHost(ipAddress);
		}
        
        /*xxx: 添加服务信息, 通过将serviceKey 与 url进行映射存储 */
		/*xxx: serviceKey 由接口名+组名+版本号 组成*/
		this.allExportedURLs.add(actualURL.getServiceKey(), actualURL);
    }
}

# 元数据服务

public class IntrospectiveDubboMetadataService implements DubboMetadataService {
 	@Autowired
	/*xxx: 元数据仓库*/
	private ObjectProvider<DubboServiceMetadataRepository> dubboServiceMetadataRepository;
    
    
    @Override
	/*xxx: 获取服务提供者信息*/
	public String getExportedURLs(String serviceInterface, String group, String version) {
		List<URL> urls = getRepository().getExportedURLs(serviceInterface, group,
				version);
		return jsonUtils.toJSON(urls);
	}

	private DubboServiceMetadataRepository getRepository() {
		return dubboServiceMetadataRepository.getIfAvailable();
	}
}

# 元素据服务代理

/*xxx: 服务元数据代理*/
public class DubboMetadataServiceProxy implements BeanClassLoaderAware, DisposableBean {
 	/*xxx: 服务发现客户端 */
	private final DiscoveryClient discoveryClient;
    
    /*xxx: 通过服务实例名 获取元数据服务 */
	private DubboMetadataService getProxy0(String serviceName) {
		return getProxy(discoveryClient.getInstances(serviceName));
	}
    
    /*xxx: 通过服务实例 获取元数据服务 代理 */
	public DubboMetadataService getProxy(List<ServiceInstance> serviceInstances) {
     			/*xxx: 遍历 获取元数据服务路径 */
				List<URL> dubboMetadataServiceURLs = getDubboMetadataServiceURLs(
						serviceInstance.get());

				for (URL dubboMetadataServiceURL : dubboMetadataServiceURLs) {
					/*xxx: 根据元数据服务路径,创建元数据服务 代理 */
					DubboMetadataService dubboMetadataService = createProxyIfAbsent(
							dubboMetadataServiceURL);
					if (dubboMetadataService != null) {
						return dubboMetadataService;
					}
				}   
    }
    
    /*xxx: 创建元数据服务代理 */
	private DubboMetadataService createProxyIfAbsent(URL dubboMetadataServiceURL) {
		String serviceName = dubboMetadataServiceURL.getParameter(APPLICATION_KEY);
		String version = dubboMetadataServiceURL.getParameter(VERSION_KEY);
		/*xxx: 创建元数据服务代理  */
		return createProxyIfAbsent(serviceName, version);
	}
    
    /*xxx: 创建代理 */
	protected DubboMetadataService createProxy(String serviceName, String version) {

     	return (DubboMetadataService) newProxyInstance(classLoader,
				new Class[] { DubboMetadataService.class },
				/*xxx: 代理实现*/
				new DubboMetadataServiceInvocationHandler(serviceName, version,
						dubboGenericServiceFactory));   
    }
}

# 元数据服务代理实现

/*xxx: dubbo元数据代理实现 */
class DubboMetadataServiceInvocationHandler implements InvocationHandler {
 	DubboMetadataServiceInvocationHandler(String serviceName, String version,
			DubboGenericServiceFactory dubboGenericServiceFactory) {
		/*xxx: 通过dubbo泛型工厂,创建 dubbo泛型服务 */
		this.genericService = dubboGenericServiceFactory.create(serviceName,
				DubboMetadataService.class, version);
	}
    
    @Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
     	/*xxx: 通过泛型服务调用指定的方法*/
		returnValue = genericService.$invoke(method.getName(),
					getParameterTypes(method), args);   
    }
}

# 泛型服务工厂

/*xxx: 泛型工厂*/
public class DubboGenericServiceFactory {
 	@Autowired
	/*xxx: 注册中心配置,为生产 dubbo泛型服务提供支持 */
	private ObjectProvider<List<RegistryConfig>> registryConfigs;
    
    /*xxx: 创建泛型服务 */
	public GenericService create(String serviceName, Class<?> serviceClass,
			String version) {
     	String interfaceName = serviceClass.getName();
		ReferenceBean<GenericService> referenceBean = build(interfaceName, version,
				serviceName, emptyMap());
        
        return referenceBean.get();
    }
    
    /*xxx: 构建消费者*/
	private ReferenceBean<GenericService> build(String interfaceName, String version,
			String group, Map<String, Object> dubboTranslatedAttributes) {
     	String key = createKey(interfaceName, version, group, dubboTranslatedAttributes);

		return cache.computeIfAbsent(key, k -> {
			ReferenceBean<GenericService> referenceBean = new ReferenceBean<>();
			referenceBean.setGeneric(true);
			referenceBean.setInterface(interfaceName);
			referenceBean.setVersion(version);
			referenceBean.setGroup(group);
			referenceBean.setCheck(false);
			bindReferenceBean(referenceBean, dubboTranslatedAttributes);
			return referenceBean;
		});   
        
    }
    
    
    /*xxx: 配置消费者 */
	private void bindReferenceBean(ReferenceBean<GenericService> referenceBean,
			Map<String, Object> dubboTranslatedAttributes) {
        
        DataBinder dataBinder = new DataBinder(referenceBean);
        
        // ignore "registries" field and then use RegistryConfig beans
		dataBinder.setDisallowedFields("registries");
     	
        registryConfigs.ifAvailable(referenceBean::setRegistries);
        
    }
}

# 服务上线

/*xxx: dubboCloud注册中心*/
public class DubboCloudRegistry extends FailbackRegistry
        implements ApplicationListener<ServiceInstancesChangedEvent> {
 	/*xxx: dubbo元数据仓库, 所有的操作,都是围绕其进行 */
    private final DubboServiceMetadataRepository repository;
    
    
    @Override
    /*xxx: 注册服务 */
    public final void doRegister(URL url) {
        synchronized (this) {
            /*xxx: 初始化*/
            preInit();
            if (shouldNotRegister(url)) {
                return;
            }
            /*xxx: 生产者进行注册, 注册到元数据仓库中的(注册到本地)*/
            repository.exportURL(url);
        }
    }	
    
}
@Repository
/*xxx: 元数据仓库 */
public class DubboServiceMetadataRepository
		implements SmartInitializingSingleton, ApplicationEventPublisherAware {
    
 	/*xxx: 注册服务到注册中心(暴露服务)*/
	public void exportURL(URL url) {
     	URL actualURL = url;
		/*xxx: 将ip 进行拆分 */
		InetUtils.HostInfo hostInfo = inetUtils.findFirstNonLoopbackHostInfo();
		String ipAddress = hostInfo.getIpAddress();
        
        if (!Objects.equals(url.getHost(), ipAddress)) {
			actualURL = url.setHost(ipAddress);
		}
        
        /*xxx: 添加服务信息, 通过将serviceKey 与 url进行映射存储 */
		/*xxx: serviceKey 由接口名+组名+版本号 组成*/
		this.allExportedURLs.add(actualURL.getServiceKey(), actualURL);
    }
}

# 消费订阅

/*xxx: dubboCloud注册中心*/
public class DubboCloudRegistry extends FailbackRegistry
        implements ApplicationListener<ServiceInstancesChangedEvent> {
    
    private void preInit() {
         //xxx: 如果初始化时,已经存在订阅处理器实例,则进行初始化
         urlSubscribeHandlerMap.forEach((url, handler) -> handler.init());
         /*xxx: 初始化元数据*/
         repository.initializeMetadata();
        
        /*xxx: 监听实例变更事件 */
        applicationContext.addApplicationListener(this);
    }
    
 	@Override
    //xxx: 消费者操作的端口,消费者通过 注册中心进行订阅
    public final void doSubscribe(URL url, NotifyListener listener) {
     	synchronized (this) {
            preInit();
         	 /*xxx: 如果是监控订阅*/
            if (isAdminURL(url)) {
             	//xxx: 略   
            }else if (isDubboMetadataServiceURL(url) && containsProviderCategory(url)) {
             	String appName = getServiceName(url);
                /*xxx: 如果是元数据订阅*/
                MetadataServiceSubscribeHandler handler = new MetadataServiceSubscribeHandler(
                        appName, url, listener, this, dubboMetadataUtils);
                if (inited.get()) {
                    handler.init();
                }
                metadataSubscribeHandlerMap.put(appName, handler);   
            }
            /*xxx: 如果是客户端订阅, 则所有的代理实现,由 GeneralServiceSubsribeHandler实现, 可以类比 dubbo的 泛型服务*/
            else if (isConsumerServiceURL(url)) {
             	GenearalServiceSubscribeHandler handler = new GenearalServiceSubscribeHandler(
                        url, listener, this, repository, jsonUtils,
                        dubboMetadataConfigServiceProxy);
                if (inited.get()) {
                    /*xxx: 进行初始化*/
                    handler.init();
                }
                /*xxx: 将路径 与 订阅处理器实例进行绑定*/
                urlSubscribeHandlerMap.put(url, handler);   
            }
            
            
        }
    }
    
}

# 泛型订阅处理器

/*xxx: 服务订阅处理器 */
public abstract class AbstractServiceSubscribeHandler {
 	public void init() {
		if (inited.compareAndSet(false, true)) {
			doInit();
		}
	}

	protected abstract void doInit();   
}
//xxx: 泛型服务订阅处理器
public class GenearalServiceSubscribeHandler extends AbstractServiceSubscribeHandler {
    
 	//xxx: 泛型服务订阅处理初始化
	public synchronized void doInit() {
     	   /*xxx: 获取所有服务实例中,与当前 springCloud 兼容的 dubbo提供者  */
		Map<String, Map<String, List<ServiceInstance>>> map = registry
				.getServiceRevisionInstanceMap();
        
        for (Map.Entry<String, Map<String, List<ServiceInstance>>> entry : map
				.entrySet()) {
			String appName = entry.getKey();
            /*xxx: 获取当前服务的服务集群*/
			Map<String, List<ServiceInstance>> revisionMap = entry.getValue();

			for (Map.Entry<String, List<ServiceInstance>> revisionEntity : revisionMap
					.entrySet()) {
				String revision = revisionEntity.getKey();
				List<ServiceInstance> instances = revisionEntity.getValue();
				/*xxx: 订阅对应服务实例上的提供者*/
				init(appName, revision, instances);
			}
		}
        
        /*xxx: 获取到服务提供者信息后 进行refresh操作 */
		refresh();
    }
    
    public synchronized void refresh() {
		List<URL> urls = getProviderURLs();
		/*xxx: 该操作,更新 directory的 提供者 invokers */
		notifyAllSubscribedURLs(url, urls, listener);
	}
    
    public void init(String appName, String revision,
			List<ServiceInstance> instanceList) {
		/*xxx: 获取 服务提供者信息  */
		List<URL> urls = getTemplateExportedURLs(url, revision, instanceList);
		if (urls != null && urls.size() > 0) {
			addAppNameWithRevision(appName, revision);
			/*xxx: 缓存服务提供者信息 */
			setUrlTemplate(appName, revision, urls);
		}
	}
    
    private List<URL> getTemplateExportedURLs(URL subscribedURL, String revision,
			List<ServiceInstance> serviceInstances) {
     	/*xxx: 获取dubbo元数据服务, 这个是通过dubbo调用获取到的结果*/
		DubboMetadataService dubboMetadataService = getProxy(serviceInstances);
        
        List<URL> templateExportedURLs = emptyList();
        
        /*xxx: 获取到服务提供者信息*/
		templateExportedURLs = getExportedURLs(dubboMetadataService, revision,
					subscribedURL);
        
        return templateExportedURLs;
    }
    
    private List<URL> getExportedURLs(DubboMetadataService dubboMetadataService,
			String revision, URL subscribedURL) {
        
        String serviceInterface = subscribedURL.getServiceInterface();
		String group = subscribedURL.getParameter(GROUP_KEY);
		String version = subscribedURL.getParameter(VERSION_KEY);
        
     	RpcContext.getContext().setAttachment(SCA_REVSION_KEY, revision);
		//xxx: 此处调用 metadataDataService获取元数据信息
		//xxx: metaDataService是一个消费者,这个最终会调用 dubbo获取
		//xxx: 最终获取的动作是,在对应服务实例 内存中的  一个 map属性获取, 也就是 metaDataRepository
		//xxx: 返回服务提供者的节点信息
		String exportedURLsJSON = dubboMetadataService.getExportedURLs(serviceInterface,
				group, version);
        
        String subscribedProtocol = subscribedURL.getParameter(PROTOCOL_KEY);
		//xxx: 将该服务提供者中,所有不需要协议,或者与消费者协议相匹配的,都筛选出来
		return jsonUtils.toURLs(exportedURLsJSON).stream()
				.filter(exportedURL -> subscribedProtocol == null
						|| subscribedProtocol.equalsIgnoreCase(exportedURL.getProtocol()))
				.collect(Collectors.toList());
    }
}

# dubbo调用链路调试

  • 消费者:调用应用接口 -> InvokerInvocationHandler.invoke -> 一系列Invoker.invoke -> 一系列Filter,包括ConsumerContextFilter.invoke -> AsyncToSyncInvoker.invoke -> DubboInvoker.invoke -> LazyConnectExchangeClient.request -> AbstractPeer.send -> NettyChannel.send
  • 生产者:thread.run -> ThreadPoolExecutor.runWorker -> org.apache.dubbo.remoting.transport.DecodeHandler.received -> HeaderExchangeHandler.received -> HeaderExchangeHandler.handleRequest -> DubboProtocol.reply -> 一系列dubbo的filter的invoke -> DelegateProviderMetaDataInvoker.invoke -> JavassistProxyFactory.doInvoke -> 应用方法