# 概述
- SpringCloud是一系列框架的有序集合,是一套简单易懂,易部署和易维护的分布式系统开发工具包。
- 将各家公司开发的比较成熟,经得起实际考验的服务框架组合起来,简化了分布式系统基础设施的开发。
# 组成
- 对现有成熟框架SpringBoot化的封装和抽象
- 一部分分布式系统的基础设施的实现
# 发展历程
- 微服务概念源于2014年3月Martin Fowler写的文章《Microservices》。
- SpringCloud通过BOM(Bill of Materials)来管理每个版本的项目清单,为了避免与子项目发布号混淆,通过命名的方式进行版本管理,命名采用了伦敦地铁站的名称;
# 版本管理
采用伦敦地铁站命名,首字母越靠后,表示版本号越大
# 版本发布说明
- BUILD-XXX: 开发版,开发团队内部使用,不稳定
- GA: 稳定版,基本上可以使用
- PRE(M1、M2): 里程碑版,修复了一些BUG的版本,一个GA后,通常有多个里程碑版
- RC: 候选发布版,最终版的发行观察期,只修复比较严重的BUG
- SR: 正式发布版
# 版本历程
版本号 | 发行时间 | SpringCloud基础设施版本 | springBoot版本 |
---|---|---|---|
Angel.SR5 | 2016.01 | 1.0.4.RELEASE | 1.2.8.REASE |
Brixton.RELEASE | 2016.05 | 1.1.0.RELEASE | 1.3.4.RELEASE |
Camden.RELEASE | 2016.09 | 1.1.3.RELEASE | 1.3.7.RELEASE |
Dalston.RELEASE | 2017.04 | 1.2.0.RELEASE | 1.5.2.RELEASE |
Edgware.RELEASE | 2017.11 | 1.3.0.RELEASE | ------ |
Finchley.RELEASE | 2018.06 | 2.0.0.RELEASE | ------ |
Greenwich.RELEASE | 2019.01 | 2.1.0.RELEASE | ------ |
Hoxton.RELEASE | 2019.11 | 2.2.0.RELEASE | 2.2.1.RELEASE |
2020.0.0 | 2020.12 | 3.0.0 | 2.4.1 |
2021.0.0 | 2021.12 | 3.1.0 | 2.6.1 |
# 源码
springCloud采用了类似springBoot的依赖设计。SpringCloud环境,可以通过基座来分别构建。分别为:
- spring-cloud-context模块
- spring-cloud-commons模块 本质上springCloud可以看作对SpringBoot的深度定制化
# 配置中心
# 原理
# 从springBoot说起
public class SpringApplication {
public ConfigurableApplicationContext run(String... args) {
//xxx: 省略其它抽象...
ConfigurableEnvironment environment = prepareEnvironment(listeners, bootstrapContext, applicationArguments);
//xxx: 省略其它抽象...
}
private ConfigurableEnvironment prepareEnvironment(SpringApplicationRunListeners listeners,
DefaultBootstrapContext bootstrapContext, ApplicationArguments applicationArguments) {
//xxx: 省略其它抽象...
/*xxx: 获取或创建环境 */
ConfigurableEnvironment environment = getOrCreateEnvironment();
/*xxx: 通知环境已经准备完成*/
listeners.environmentPrepared(bootstrapContext, environment);
//xxx: 省略其它抽象...
return environment;
}
}
public
class SpringApplicationRunListeners {
private final List<SpringApplicationRunListener> listeners;
void environmentPrepared(ConfigurableBootstrapContext bootstrapContext, ConfigurableEnvironment environment) {
//xxx: 委托 Listeners执行
}
}
/*xxx: springBoot默认情况下,仅提供了 一个监听器进行注册, EventPublishingRunListener*/
/*xxx: 该接口为 run方法提供了各个阶段的监听事件处理功能 */
public interface SpringApplicationRunListener {
/*xxx: 当environment准备完成,在 ApplicationContext创建之前,该方法被调用*/
default void environmentPrepared(ConfigurableBootstrapContext bootstrapContext,
ConfigurableEnvironment environment) {
environmentPrepared(environment);
}
@Deprecated
default void environmentPrepared(ConfigurableEnvironment environment) {
}
}
/*xxx: 是 springBoot针对 SpringApplicationRunListener接口的唯一内建实现*/
public class EventPublishingRunListener implements SpringApplicationRunListener, Ordered {
/*xxx: 事件广播器 */
private final SimpleApplicationEventMulticaster initialMulticaster;
public EventPublishingRunListener(SpringApplication application, String[] args) {
/*xxx: 创建 SimpleApplicationEventMulticaster广播器*/
this.initialMulticaster = new SimpleApplicationEventMulticaster();
/*xxx: 遍历 ApplicationListener并关联 multicaster*/
for (ApplicationListener<?> listener : application.getListeners()) {
this.initialMulticaster.addApplicationListener(listener);
}
}
@Override
public void environmentPrepared(ConfigurableBootstrapContext bootstrapContext,
ConfigurableEnvironment environment) {
this.initialMulticaster.multicastEvent(
new ApplicationEnvironmentPreparedEvent(bootstrapContext, this.application, this.args, environment));
}
}
# 生效流程
/*xxx: 环境装配事件监听器 */
public class BootstrapApplicationListener implements ApplicationListener<ApplicationEnvironmentPreparedEvent>, Ordered {
@Override
/*xxx: bootstrap父类应用监听器,会以bootstrap为名称,实例化一个轻量级的容器 */
public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) {
context = bootstrapServiceContext(environment, event.getSpringApplication(), configName);
/*xxx: 省略其他抽象...*/
/*xxx: 将上下文定制器,添加到spring应用中进行缓存,略*/
apply(context, event.getSpringApplication(), environment);
}
/*xxx: 配置并创建 bootstrap上下文容器 */
private ConfigurableApplicationContext bootstrapServiceContext(ConfigurableEnvironment environment,
final SpringApplication application, String configName) {
/*xxx: 实例化父级上下文 */
SpringApplicationBuilder builder = new SpringApplicationBuilder().profiles(environment.getActiveProfiles())
.bannerMode(Mode.OFF).environment(bootstrapEnvironment)
.registerShutdownHook(false).logStartupInfo(false).web(WebApplicationType.NONE);
/*xxx: 该importer,实现加载bootstrap上下文的功能 */
builder.sources(BootstrapImportSelectorConfiguration.class);
/*xxx: 创建bootstrap上下文容器 */
final ConfigurableApplicationContext context = builder.run();
return context;
}
}
@Configuration(proxyBeanMethods = false)
@Import(BootstrapImportSelector.class)
/*xxx: spring-cloud-context 自带了了配置生效功能*/
public class BootstrapImportSelectorConfiguration {
}
public class BootstrapImportSelector implements EnvironmentAware, DeferredImportSelector {
@Override
public String[] selectImports(AnnotationMetadata annotationMetadata) {
List<String> names = new ArrayList<>(
SpringFactoriesLoader.loadFactoryNames(BootstrapConfiguration.class, classLoader));
/*xxx: 省略其它抽象...*/
return names.toArray();
}
}
org.springframework.cloud.bootstrap.BootstrapConfiguration=org.springframework.cloud.bootstrap.config.PropertySourceBootstrapConfiguration
/*xxx: springCloud的核心配置文件,其本质是基于 应用初始化器 生效*/
/*xxx: 需要注意到的是,该类是在bootstrap上下文阶段初始化的*/
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(PropertySourceBootstrapProperties.class)
public class PropertySourceBootstrapConfiguration
implements ApplicationContextInitializer<ConfigurableApplicationContext>, Ordered {
@Autowired(required = false)
/*xxx: 属性源,该属性源 由 springCloud提供*/
private List<PropertySourceLocator> propertySourceLocators = new ArrayList<>();
@Override
/*xxx: 核心初始化流程,其主要作用就是基于特定规则,将多个配置合并*/
public void initialize(ConfigurableApplicationContext applicationContext) {
List<PropertySource<?>> composite = new ArrayList<>();
/*xxx: 对所有属性源进行排序*/
AnnotationAwareOrderComparator.sort(this.propertySourceLocators);
/*xxx: 筛选有效属性源配置,并收集*/
for (PropertySourceLocator locator : this.propertySourceLocators) {
Collection<PropertySource<?>> source = locator.locateCollection(environment);
List<PropertySource<?>> sourceList = new ArrayList<>();
for (PropertySource<?> p : source) {
sourceList.add(new SimpleBootstrapPropertySource(p));
}
composite.addAll(sourceList);
/*xxx: 存在有效的属性源,则进行进一步配置*/
MutablePropertySources propertySources = environment.getPropertySources();
/*xxx: 配置属性源*/
insertPropertySources(propertySources, composite);
}
}
}
/*xxx: 定位属性源接口*/
public interface PropertySourceLocator {
PropertySource<?> locate(Environment environment);
}
# 常用组件
- 以Nacos为例
@Order(0)
public class NacosPropertySourceLocator implements PropertySourceLocator {
@Override
public PropertySource<?> locate(Environment env) {
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
}
private void loadApplicationConfiguration(
CompositePropertySource compositePropertySource, String dataIdPrefix,
NacosConfigProperties properties, Environment environment) {
loadNacosDataIfPresent(compositePropertySource,
dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);
}
private void loadNacosDataIfPresent(final CompositePropertySource composite,
final String dataId, final String group, String fileExtension,
boolean isRefreshable) {
//xxx: nacos内部的加载策略, 略....
}
}
# 生效规则
- 引入了依赖后,默认开启,它是工厂级别的配置;
- 可通过配置手动关闭,如
spring.cloud.nacos.config.enabled=false
,但是早期的一些springBoot
自动配置并不支持该选项,虽然该特性很简单;
# 注意事项
# 上下文默认名称配置
- 默认上下文的名称为
application
- 可以通过
spring.application.name
进行更改 - 也可以通过
context.setId()
进行更改
public class ContextIdApplicationContextInitializer
implements ApplicationContextInitializer<ConfigurableApplicationContext>, Ordered {
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
ContextId contextId = getContextId(applicationContext);
applicationContext.setId(contextId.getId());
applicationContext.getBeanFactory().registerSingleton(ContextId.class.getName(), contextId);
}
/*xxx: 注意,springCloud环境下, 当前上下文的父级上下文名称依然为 application,这是因为cloud做了特别处理*/
private ContextId getContextId(ConfigurableApplicationContext applicationContext) {
ApplicationContext parent = applicationContext.getParent();
if (parent != null && parent.containsBean(ContextId.class.getName())) {
return parent.getBean(ContextId.class).createChildId();
}
return new ContextId(getApplicationId(applicationContext.getEnvironment()));
}
private String getApplicationId(ConfigurableEnvironment environment) {
String name = environment.getProperty("spring.application.name");
return StringUtils.hasText(name) ? name : "application";
}
}
# 上下文默认配置名称
spring.config.name
用于读取默认配置文件的名称- 目前比较常见的有:
springboot
为代表的application
,springCloud
为代表的bootstrap
;
# 上下文默认配置路径
spring.conig.location
用于配置配置路径spring.config.additional-location
可用于配置外部配置地址
# 配置实时刷新
# 原理
/*xxx: 上下文刷新器*/
public abstract class ContextRefresher {
private RefreshScope scope;
/*xxx: 核心方法,刷新*/
public synchronized Set<String> refresh() {
Set<String> keys = refreshEnvironment();
this.scope.refreshAll();
return keys;
}
}
# 单例bean的生成方式及缓存
/*xxx: 抽象bean容器*/
public abstract class AbstractBeanFactory extends FactoryBeanRegistrySupport implements ConfigurableBeanFactory {
protected <T> T doGetBean(
String name, @Nullable Class<T> requiredType, @Nullable Object[] args, boolean typeCheckOnly)
throws BeansException {
/*xxx: 以给定的名称,获取 RootBeanDefinition,获取不到,则会直接抛错*/
RootBeanDefinition mbd = getMergedLocalBeanDefinition(beanName);
/*xxx: 创建本身的bean实例,如果是单例模式*/
if (mbd.isSingleton()) {
sharedInstance = getSingleton(beanName, () -> {
/*xxx: 首次调用时,需要创建bean,创建后,即进行缓存 */
return createBean(beanName, mbd, args);
});
/*xxx: 获取bean的对象实例,会自动处理工厂模式*/
bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd);
}
}
}
/*xxx: 默认单例bean注册表*/
public class DefaultSingletonBeanRegistry extends SimpleAliasRegistry implements SingletonBeanRegistry {
/*xxx: 该方法不仅会创建bean,还会将单例的bean进行缓存*/
public Object getSingleton(String beanName, ObjectFactory<?> singletonFactory) {
Object singletonObject = this.singletonObjects.get(beanName);
if (singletonObject == null) {
/*xxx: 从单例工厂,获取单例,也就是 createBean方法*/
singletonObject = singletonFactory.getObject();
newSingleton = true;
}
if (newSingleton) {
/*xxx: 已经被创建的单例,需要被缓存起来*/
addSingleton(beanName, singletonObject);
}
return singletonObject;
}
}
# 原型bean的生成方式及缓存
/*xxx: 抽象bean容器*/
public abstract class AbstractBeanFactory extends FactoryBeanRegistrySupport implements ConfigurableBeanFactory {
protected <T> T doGetBean(
String name, @Nullable Class<T> requiredType, @Nullable Object[] args, boolean typeCheckOnly)
throws BeansException {
/*xxx: 原型模式,每次都会创建一个新的实例,该新实例也会自动处理工厂模式 */
if (mbd.isPrototype()) {
Object prototypeInstance = null;
prototypeInstance = createBean(beanName, mbd, args);
bean = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd);
}
}
}
# 自定义领域bean的生成及缓存
/*xxx: 抽象bean容器*/
public abstract class AbstractBeanFactory extends FactoryBeanRegistrySupport implements ConfigurableBeanFactory {
protected <T> T doGetBean(
String name, @Nullable Class<T> requiredType, @Nullable Object[] args, boolean typeCheckOnly)
throws BeansException {
/*xxx: 如果既非 单例模式,也非原型模式,则按照特定的领域规则进行处理*/
/*xxx: 普通的领域跟原型差不多,每次都需要通过领域获取对象*/
String scopeName = mbd.getScope();
Scope scope = this.scopes.get(scopeName);
/*xxx: 通过领域的 get方法获取对象 */
Object scopedInstance = scope.get(beanName, () -> {
beforePrototypeCreation(beanName);
try {
return createBean(beanName, mbd, args);
}
finally {
afterPrototypeCreation(beanName);
}
});
bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);
}
}
# refresh领域bean的生成及缓存
/*xxx: 通用作用域的实现*/
public class GenericScope
implements Scope, BeanFactoryPostProcessor, BeanDefinitionRegistryPostProcessor, DisposableBean {
private ConcurrentMap<String, ReadWriteLock> locks = new ConcurrentHashMap<>();
/*xxx: 这里面缓存了所有 域为 refreshScope的bean的信息 */
private BeanLifecycleWrapperCache cache = new BeanLifecycleWrapperCache(new StandardScopeCache());
@Override
public Object get(String name, ObjectFactory<?> objectFactory) {
BeanLifecycleWrapper value = this.cache.put(name, new BeanLifecycleWrapper(name, objectFactory));
this.locks.putIfAbsent(name, new ReentrantReadWriteLock());
try {
/*xxx: 更新上下文后,关键在于destroy方法*/
/*xxx: destroy将旧有的包装器给干掉,再次获取时实际上就是最新的包装器*/
/*xxx: 在没有更新上下文的情况下,实际上调用上一次缓存的对象*/
/*xxx: 总体达到的效果是,在配置没有更新的情况下,使用的是单例。 在配置更新后,则刷新所有scope为当前scope的bean*/
return value.getBean();
}
catch (RuntimeException e) {
this.errors.put(name, e);
throw e;
}
}
private static class BeanLifecycleWrapper {
/*xxx: 先看本地缓存有没有bean,如果有则直接返回,如果没有,则通过对象工厂重新创建新的bean*/
/*xxx: 这个对象工厂,实际也就是 spring核心工厂*/
public Object getBean() {
if (this.bean == null) {
synchronized (this.name) {
if (this.bean == null) {
this.bean = this.objectFactory.getObject();
}
}
}
return this.bean;
}
}
}
# 重新装载bean的执行流程及并发安全设计
public class RefreshScope extends GenericScope
implements ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, Ordered {
public void refreshAll() {
super.destroy();
this.context.publishEvent(new RefreshScopeRefreshedEvent());
}
}
/*xxx: 通用作用域的实现*/
public class GenericScope
implements Scope, BeanFactoryPostProcessor, BeanDefinitionRegistryPostProcessor, DisposableBean {
@Override
public void destroy() {
Collection<BeanLifecycleWrapper> wrappers = this.cache.clear();
/*xxx:要销毁对象的时候,需要获取所在对象的写锁,然后才能执行销毁动作 */
for (BeanLifecycleWrapper wrapper : wrappers) {
try {
Lock lock = this.locks.get(wrapper.getName()).writeLock();
lock.lock();
try {
wrapper.destroy();
}
finally {
lock.unlock();
}
}
catch (RuntimeException e) {
errors.add(e);
}
}
}
}
# bean依赖关系同步更新设计(难点)
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Scope("refresh")
@Documented
/*xxx: 注释了该注解后,允许运行时更新*/
public @interface RefreshScope {
/*xxx: proxyMode 为 target_class时,引用的是一个代理对象*/
/*xxx: 并且,此时上下文中,会写入两个该类型的 bean,默认情况下,引用的是代理对象 */
ScopedProxyMode proxyMode() default ScopedProxyMode.TARGET_CLASS;
}
/*xxx: 注解配置工具类*/
public abstract class AnnotationConfigUtils {
/*xxx: 执行scope代理 */
static BeanDefinitionHolder applyScopedProxyMode(
ScopeMetadata metadata, BeanDefinitionHolder definition, BeanDefinitionRegistry registry) {
ScopedProxyMode scopedProxyMode = metadata.getScopedProxyMode();
if (scopedProxyMode.equals(ScopedProxyMode.NO)) {
return definition;
}
/*xxx: 是否需要cglib代理*/
boolean proxyTargetClass = scopedProxyMode.equals(ScopedProxyMode.TARGET_CLASS);
return ScopedProxyCreator.createScopedProxy(definition, registry, proxyTargetClass);
}
}
final class ScopedProxyCreator {
public static BeanDefinitionHolder createScopedProxy(
BeanDefinitionHolder definitionHolder, BeanDefinitionRegistry registry, boolean proxyTargetClass) {
return ScopedProxyUtils.createScopedProxy(definitionHolder, registry, proxyTargetClass);
}
}
public abstract class ScopedProxyUtils {
private static final String TARGET_NAME_PREFIX = "scopedTarget.";
/*xxx: beanDefinition劫持 */
public static BeanDefinitionHolder createScopedProxy(BeanDefinitionHolder definition,
BeanDefinitionRegistry registry, boolean proxyTargetClass) {
BeanDefinition targetDefinition = definition.getBeanDefinition();
/*xxx: 实际的代理对象载体:ScopedProxyFactoryBean */
RootBeanDefinition proxyDefinition = new RootBeanDefinition(ScopedProxyFactoryBean.class);
String targetBeanName = getTargetBeanName(originalBeanName);
targetDefinition.setAutowireCandidate(false);
targetDefinition.setPrimary(false);
/*xxx: 注册原始对象*/
registry.registerBeanDefinition(targetBeanName, targetDefinition);
return new BeanDefinitionHolder(proxyDefinition, originalBeanName, definition.getAliases());
}
}
/*xxx: 通用作用域的实现*/
public class GenericScope
implements Scope, BeanFactoryPostProcessor, BeanDefinitionRegistryPostProcessor, DisposableBean {
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
/*xxx: 对于已经被 @scope(proxyMode)劫持过的 beanDefinition,继续进行改写*/
for (String name : registry.getBeanDefinitionNames()) {
BeanDefinition definition = registry.getBeanDefinition(name);
if (definition instanceof RootBeanDefinition) {
RootBeanDefinition root = (RootBeanDefinition) definition;
/*xxx: 原始劫持对象 */
if (root.getDecoratedDefinition() != null && root.hasBeanClass()
&& root.getBeanClass() == ScopedProxyFactoryBean.class) {
if (getName().equals(root.getDecoratedDefinition().getBeanDefinition().getScope())) {
/*xxx: 改写为自定义的 bean*/
root.setBeanClass(LockedScopedProxyFactoryBean.class);
root.getConstructorArgumentValues().addGenericArgumentValue(this);
// surprising that a scoped proxy bean definition is not already
// marked as synthetic?
root.setSynthetic(true);
}
}
}
}
}
/*xxx: 该类实现了 单例对象引用时的代理对象,为其提供实际的代理拦截器方法 */
/*xxx: 内部持有一个读写锁,当实际的bean的方法正在被调用时,会加上读锁,方法调用完成后,释放读锁 */
public static class LockedScopedProxyFactoryBean<S extends GenericScope> extends ScopedProxyFactoryBean
implements MethodInterceptor {
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
Object proxy = getObject();
if (proxy instanceof Advised) {
/*xxx: 将自身织入,进行代理*/
Advised advised = (Advised) proxy;
advised.addAdvice(0, this);
}
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Object proxy = getObject();
ReadWriteLock readWriteLock = this.scope.getLock(this.targetBeanName);
Lock lock = readWriteLock.readLock();
lock.lock();
try {
if (proxy instanceof Advised) {
Advised advised = (Advised) proxy;
ReflectionUtils.makeAccessible(method);
return ReflectionUtils.invokeMethod(method, advised.getTargetSource().getTarget(),
invocation.getArguments());
}
return invocation.proceed();
}finally {
lock.unlock();
}
}
}
}
# 常用组件
- 以
nacos
为例 - 需要注意,nacos的ContextRefresher与springCloud的ContextRefresher不是一个东西;
public class NacosContextRefresher
implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {
private void registerNacosListener(final String group, final String dataId) {
Listener listener = listenerMap.computeIfAbsent(dataId, i -> new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
}
});
}
}
public class RefreshEventListener implements SmartApplicationListener {
private ContextRefresher refresh;
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ApplicationReadyEvent) {
handle((ApplicationReadyEvent) event);
}
else if (event instanceof RefreshEvent) {
handle((RefreshEvent) event);
}
}
/*xxx: 该监听器收到合法消息后,调用上下文刷新器进行刷新*/
public void handle(RefreshEvent event) {
if (this.ready.get()) { // don't handle events before app is ready
log.debug("Event received " + event.getEventDesc());
Set<String> keys = this.refresh.refresh();
log.info("Refresh keys changed: " + keys);
}
}
}
/*xxx: 上下文刷新器*/
public abstract class ContextRefresher {
/*xxx: 核心方法,刷新*/
public synchronized Set<String> refresh() {
Set<String> keys = refreshEnvironment();
this.scope.refreshAll();
return keys;
}
public synchronized Set<String> refreshEnvironment() {
Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());
updateEnvironment();
Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet();
/*xxx: 更新环境后,通知监听器 重新绑定bean对象*/
this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
return keys;
}
}
public class ConfigDataContextRefresher extends ContextRefresher {
@Override
protected void updateEnvironment() {
/*xxx: 更新environment数据,略*/
}
}
# 注册中心
# 服务注册原理
# 自动配置定义
@Import(AutoServiceRegistrationConfiguration.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
/*xxx: 服务注册自动配置*/
public class AutoServiceRegistrationAutoConfiguration {
@Autowired(required = false)
//xxx: 服务自动注册 顶级接口
private AutoServiceRegistration autoServiceRegistration;
@PostConstruct
protected void init() {
/*xxx: 在需要自动服务注册的环境下,没有自动服务注册的定义,并且配置了failFast,则会报错*/
if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
throw new IllegalStateException(
"Auto Service Registration has " + "been requested, but there is no AutoServiceRegistration bean");
}
}
}
@EnableConfigurationProperties(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public class AutoServiceRegistrationConfiguration {
}
@Configuration(proxyBeanMethods = false)
public class ServiceRegistryAutoConfiguration {
/*xxx: 有服务注册表定义的环境,定义该管理端口*/
@ConditionalOnBean(ServiceRegistry.class)
@ConditionalOnClass(Endpoint.class)
protected class ServiceRegistryEndpointConfiguration {
@Autowired(required = false)
/*xxx: 服务实例,由于springCloud环境是以应用粒度注册服务的,因此一个应用代表一个服务实例*/
private Registration registration;
@Bean
@ConditionalOnAvailableEndpoint
/*xxx: 服务状态管理端口*/
public ServiceRegistryEndpoint serviceRegistryEndpoint(ServiceRegistry serviceRegistry) {
ServiceRegistryEndpoint endpoint = new ServiceRegistryEndpoint(serviceRegistry);
endpoint.setRegistration(this.registration);
return endpoint;
}
}
}
# 顶级接口定义
- 服务实例
public interface ServiceInstance {
default String getInstanceId() {
return null;
}
String getServiceId();
String getHost();
int getPort();
Map<String, String> getMetadata();
URI getUri();
}
/*xxx: 注册的服务实例*/
public interface Registration extends ServiceInstance {
}
- 服务注册表
/*xxx: 服务注册表*/
public interface ServiceRegistry<R extends Registration> {
void register(R registration);
void deregister(R registration);
void setStatus(R registration, String status);
<T> T getStatus(R registration);
/*xxx: 生命周期方法*/
void close();
}
- 服务自动注册
/*xxx: 服务自动注册定义*/
public interface AutoServiceRegistration {
}
/*xxx: 抽象自动服务注册 */
public abstract class AbstractAutoServiceRegistration<R extends Registration>
implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
/*xxx: 服务注册表*/
private final ServiceRegistry<R> serviceRegistry;
//xxx: 服务自动注册,必须有服务注册表的定义
protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry,
AutoServiceRegistrationProperties properties) {
this.serviceRegistry = serviceRegistry;
this.properties = properties;
}
/*xxx: 当 webServer初始化事件完成后,开始进行服务注册 */
/*xxx: 得知: 服务实例通常是一个web服务,web服务就有 ip,端口,上下文,路径等信息*/
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
public void start() {
this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
//xxx: 服务自动注册
register();
this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
}
protected void register() {
this.serviceRegistry.register(getRegistration());
}
}
# 服务注册-生效规则
- springCloud环境下,存在服务自动注册组件定义时,自动生效,同时支持属性关闭,它是应用级别的配置.
# 服务注册-常用组件
- nacos为例
@ConditionalOnNacosDiscoveryEnabled /*xxx: 是否满足nacos自动注册的条件,通过配置项 spring.cloud.nacos.discovery.enabled控制,默认都是满足的 */
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
/*xxx: 是否允许springCloud环境的服务注册,通过配置项 spring.cloud.service-registry.auto-registration.enabled进行控制 */
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class })
public class NacosDiscoveryAutoConfiguration {
@Bean
/*xxx: 服务注册表*/
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
/*xxx: 服务实例*/
public NacosRegistration nacosRegistration(
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
/*xxx: 自动服务注册*/
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}
# 服务发现原理
# 自动配置定义
/*xxx: 服务发现自动配置*/
public class SimpleDiscoveryClientAutoConfiguration implements ApplicationListener<WebServerInitializedEvent> {
@Override
public void onApplicationEvent(WebServerInitializedEvent webServerInitializedEvent) {
port = webServerInitializedEvent.getWebServer().getPort();
if (port > 0) {
simple.getLocal().setHost(inet.findFirstNonLoopbackHostInfo().getHostname());
simple.getLocal().setPort(port);
}
}
@Bean
@Order
/*xxx: springCloud环境,默认会定义一个服务发现客户端,指向本机.其优先级比较低*/
public DiscoveryClient simpleDiscoveryClient(SimpleDiscoveryProperties properties) {
return new SimpleDiscoveryClient(properties);
}
}
@Configuration(proxyBeanMethods = false)
@AutoConfigureBefore(SimpleDiscoveryClientAutoConfiguration.class)
/*xxx: 服务发现客户端收集*/
public class CompositeDiscoveryClientAutoConfiguration {
@Bean
@Primary
public CompositeDiscoveryClient compositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {
return new CompositeDiscoveryClient(discoveryClients);
}
}
# 顶级接口配置
/*xxx: 服务注册与发现*/
public interface DiscoveryClient extends Ordered {
int DEFAULT_ORDER = 0;
/*xxx: 服务实例集群*/
List<ServiceInstance> getInstances(String serviceId);
/*xxx: 获取所有的服务id列表*/
List<String> getServices();
@Override
default int getOrder() {
return DEFAULT_ORDER;
}
}
# 服务发现-生效规则
- 无条件自动生效,且无法通过配置关闭(因为是兼容的);
- 它是基于应用层面的应用;
# 服务发现-常用组件
- 以nacos为例
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class })
public class NacosDiscoveryClientAutoConfiguration {
@Bean
@ConditionalOnMissingBean
/*xxx: 服务发现中心(注册表)*/
public NacosDiscoveryProperties nacosProperties() {
return new NacosDiscoveryProperties();
}
@Bean
/*xxx: 服务发现客户端*/
public DiscoveryClient nacosDiscoveryClient(
NacosDiscoveryProperties discoveryProperties) {
return new NacosDiscoveryClient(discoveryProperties);
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", matchIfMissing = true)
/*xxx: 服务监控,由于nacos注册中心是中心化的,因此服务状态应该都是实时的*/
/*xxx: 这个配置应该是用于注册中心服务变动后,需要自动完成的动作*/
public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosWatch(nacosDiscoveryProperties);
}
}
public class NacosDiscoveryClient implements DiscoveryClient {
/*xxx: 获取服务实例*/
public List<ServiceInstance> getInstances(String serviceId) {
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, true);
return hostToServiceInstanceList(instances, serviceId);
}
}
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle {
private final TaskScheduler taskScheduler;
private static ThreadPoolTaskScheduler getTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.initialize();
return taskScheduler;
}
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
/*xxx: 默认情况下,每隔30秒会进行服务查询*/
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::nacosServicesWatch, this.properties.getWatchDelay());
}
}
public void nacosServicesWatch() {
this.publisher.publishEvent(
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}
}
# 服务发现注意事项
- 服务发现在没有主动使用的情况下,它只是一个基础设施,并没有实际发挥功效;
- 服务发现作为一项特性,在springCloud环境,始终生效,但正常的应用无感知;
# 客户端负载均衡
# 原理
# 自动配置定义
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
/*xxx: 负载均衡 自动配置项*/
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
/*xxx: 将所有的 restTemplate进行收集,并注入负载均衡功能 */
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
/*xxx: 加载 restTemplate拦截器,相当于插件*/
/*xxx: 其最大的特性 是 重试 */
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
@Bean
@ConditionalOnMissingBean
/*xxx: 负载均衡客户端重试相关的特性,依赖工厂生成 */
public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
@Configuration(proxyBeanMethods = false)
@Conditional(RetryMissingOrDisabledCondition.class)
static class LoadBalancerInterceptorConfig {
@Bean
/*xxx: 负载均衡拦截器 */
public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
/*xxx: 负载均衡 负载均衡拦截器装载 */
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
//xxx: 省略其它抽象...
}
# 顶级接口定义
/*xxx: 服务实例选择器 */
public interface ServiceInstanceChooser {
/*xxx: 选择服务实例*/
ServiceInstance choose(String serviceId);
/*xxx: 选择服务实例*/
<T> ServiceInstance choose(String serviceId, Request<T> request);
}
/*xxx: 负载均衡客户端 */
public interface LoadBalancerClient extends ServiceInstanceChooser {
/*xxx: 执行请求 */
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
/*xxx: 重构uri*/
URI reconstructURI(ServiceInstance instance, URI original);
}
# 生效条件
- 配置了负载均衡客户端后自动生效
- 从源码来看,
@LoadBalanced
不设置时,也会自动生效(与检索的资料有出入,暂时存放在这);
# 常用组件
- 以ribbon为例,以nacos作为注册中心
@RibbonClients(defaultConfiguration = NacosRibbonClientConfiguration.class)
/*xxx: 与ribbon的结合,主要是提供了一个 服务列表,其他就是ribbon自身的事情了 */
public class RibbonNacosAutoConfiguration {
}
@Configuration
@ConditionalOnRibbonNacos
/*xxx: nacos与ribbon的结合,实现负载均衡,为naocs的客户端提供服务 */
public class NacosRibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
/*xxx: 服务列表,返回nacos服务列表 */
public ServerList<?> ribbonServerList(IClientConfig config,
NacosDiscoveryProperties nacosDiscoveryProperties) {
NacosServerList serverList = new NacosServerList(nacosDiscoveryProperties);
serverList.initWithNiwsConfig(config);
return serverList;
}
@Bean
@ConditionalOnMissingBean
public NacosServerIntrospector nacosServerIntrospector() {
return new NacosServerIntrospector();
}
}
/*xxx: nacos服务列表 */
public class NacosServerList extends AbstractServerList<NacosServer> {
/*xxx: 获取nacos服务列表 */
private List<NacosServer> getServers() {
/*xxx: 通过名字服务获取 nacos的服务列表 */
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, true);
return instancesToServerList(instances);
}
}
- 以
loadbalancer
为例
/*xxx: 组赛式获取负载均衡实例*/
public class BlockingLoadBalancerClient implements LoadBalancerClient {
/*xxx: 获取服务实例*/
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
/*xxx: 请求适配器 */
LoadBalancerRequestAdapter<T, DefaultRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request,
new DefaultRequestContext(request, hint));
/*xxx: 选择服务实例 */
ServiceInstance serviceInstance = choose(serviceId, lbRequest);
/*xxx: 执行实际的请求*/
return execute(serviceId, serviceInstance, lbRequest);
}
/*xxx: 选择服务实例*/
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
/*xxx: 通过工厂获取服务实例 */
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
/*xxx: 阻塞获取服务实例响应 */
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
return loadBalancerResponse.getServer();
}
/*xxx: 对服务实例进行调用,包装适配,获取实际的执行结果 */
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
throws IOException {
/*xxx: 包装服务实例 */
DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);
/*xxx: 执行请求*/
T response = request.apply(serviceInstance);
return response;
}
}
- 自动配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration,\
org.springframework.cloud.loadbalancer.config.BlockingLoadBalancerClientAutoConfiguration,\
org.springframework.cloud.loadbalancer.config.LoadBalancerCacheAutoConfiguration
public class LoadBalancerAutoConfiguration {
@Bean
@ConditionalOnMissingBean
/*xxx: 负载均衡 zone 配置 */
public LoadBalancerZoneConfig zoneConfig(Environment environment) {
return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone"));
}
@ConditionalOnMissingBean
@Bean
/*xxx: 负载均衡客户端 工厂定义 */
public LoadBalancerClientFactory loadBalancerClientFactory() {
LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory();
clientFactory.setConfigurations(this.configurations.getIfAvailable(Collections::emptyList));
return clientFactory;
}
}
@ConditionalOnClass(RestTemplate.class)
/*xxx: 负载均衡客户端配置 */
public class BlockingLoadBalancerClientAutoConfiguration {
@Bean
@ConditionalOnBean(LoadBalancerClientFactory.class)
@ConditionalOnMissingBean
/*xxx: 负载均衡客户端配置 */
public LoadBalancerClient blockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory,
LoadBalancerProperties properties) {
return new BlockingLoadBalancerClient(loadBalancerClientFactory, properties);
}
@Configuration
@ConditionalOnClass(RetryTemplate.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
/*xxx: 负载均衡 重试配置*/
protected static class BlockingLoadBalancerRetryConfig {
@Bean
@ConditionalOnMissingBean
/*xxx: 重试工厂 */
LoadBalancedRetryFactory loadBalancedRetryFactory(LoadBalancerProperties properties) {
return new BlockingLoadBalancedRetryFactory(properties);
}
}
}
# 负载均衡流程
public class LoadBalancerAutoConfiguration {
@ConditionalOnMissingBean
@Bean
/*xxx: 负载均衡客户端 工厂定义 */
public LoadBalancerClientFactory loadBalancerClientFactory() {
LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory();
clientFactory.setConfigurations(this.configurations.getIfAvailable(Collections::emptyList));
return clientFactory;
}
}
/*xxx: 负载均衡客户端工厂, 属于reactive风格 */
public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification>
implements ReactiveLoadBalancer.Factory<ServiceInstance> {
public LoadBalancerClientFactory() {
/*xxx: 默认的负载均衡客户端配置*/
super(LoadBalancerClientConfiguration.class, NAMESPACE, PROPERTY_NAME);
}
}
@ConditionalOnDiscoveryEnabled
/*xxx: 默认的负载均衡客户端配置 */
public class LoadBalancerClientConfiguration {
@Bean
@ConditionalOnMissingBean
/*xxx: 默认的负载均衡策略: roundRobin */
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RoundRobinLoadBalancer(
/*xxx: 服务实例 提供者 充当 服务注册表*/
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
@ConditionalOnReactiveDiscoveryEnabled
@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)
/*xxx: 基于reactive的策略*/
public static class ReactiveSupportConfiguration {
@Bean
@ConditionalOnBean(ReactiveDiscoveryClient.class)
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default",
matchIfMissing = true)
/*xxx: 基于服务发现 的 服务注册表 , 默认选项 */
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);
}
/*xxx: 基于服务发现 ,且 搭配 zone 区域 过滤特性, 配置 zone-preference时生效 */
/*xxx: 基于服务发现,且 搭配 健康检查特性, 配置 health-check时生效 */
/*xxx: 基于服务发现,且搭配 session使用, 配置 request-based-sticky-session后生效 */
/*xxx: 基于服务发现,且使用同一个实例, 配置 same-instance-preference 后生效 */
}
@ConditionalOnBlockingDiscoveryEnabled
@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER + 1)
/*xxx: 基于blocking的策略 */
public static class BlockingSupportConfiguration {
//xxx: 同上,略
}
}
/*xxx: 基于服务发现的负载均衡注册表 */
public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier {
private final String serviceId;
/*xxx: 服务实例列表 */
private final Flux<List<ServiceInstance>> serviceInstances;
public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) {
this.serviceId = environment.getProperty(PROPERTY_NAME);
/*xxx: 在初始化时, 即用服务发现客户端,获取所有的服务实例 */
this.serviceInstances = Flux.defer(() -> Flux.just(delegate.getInstances(serviceId)))
.subscribeOn(Schedulers.boundedElastic()).timeout(timeout, Flux.defer(() -> {
logTimeout();
return Flux.just(new ArrayList<>());
})).onErrorResume(error -> {
logException(error);
return Flux.just(new ArrayList<>());
});
}
@Override
public Flux<List<ServiceInstance>> get() {
return serviceInstances;
}
}
# 使用示例
@RestController
public class NacosController{
@Autowired
private LoadBalancerClient loadBalancerClient;
@Autowired
private RestTemplate restTemplate;
@GetMapping("/echo/{param}")
public String echoParam(@PathVariable String param){
ServiceInstance serviceInstance = loadBalancerClient.choose("nacos-provider");
String path = String.format("http://%s:%s/echo/%s",serviceInstance.getHost(),serviceInstance.getPort(),param);
System.out.println(" request path: "+ path);
//return restTemplate.getForObject(path,String.class);
return restTemplate.getForObject("nacos-provider"+"/echo/test",String.class);
}
}
# 服务调用
# 从openFeign说起
# openFeign的发展
openFeign
时一个基于netflix
的feign
开源项目的重构版本,是一个声明式的web服务客户端;- 最初的
feign
项目,由netflix
开源 - 由于
feign
代码比较老旧,netflix
决定对其重构,形成了openFeign
项目 openFeign
由于使用方式简单,逐渐成为一个活跃的开源项目
# openFeign的使用(纯净版)
- 添加openFeign依赖
<dependencies>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-core</artifactId>
<version>11.1</version>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
<version>11.1</version>
</dependency>
</dependencies>
- 创建客户端接口
public interface UserServiceClient {
@RequestMapping("/users/{id}")
@Headers("Content-Type: application/json")
User getUserById(@Param("id") Long id);
}
- 创建feign客户端
class Test{
void test(){
HttpClient httpClient = HttpClient.newBuilder().build();
UserServiceClient userServiceClient = Feign.builder()
.client(new feign.httpclient.ApacheHttpClient(httpClient))
.encoder(new GsonEncoder())
.decoder(new GsonDecoder())
.target(UserServiceClient.class, "http://localhost:8080");
}
}
//xxx: 使用GsonEncoder和GsonDecoder实现JSON序列化和反序列化
- 声明式调用服务
class Test{
void test(){
User user = userServiceClient.getUserById(1L);
}
}
# 原理
# 自动配置定义
@ConditionalOnClass(Feign.class)
@Import(DefaultGzipDecoderConfiguration.class)
/*xxx: feign的自动配置 */
public class FeignAutoConfiguration {
@Bean
/*xxx: feign的上下文配置 */
public FeignContext feignContext() {
FeignContext context = new FeignContext();
context.setConfigurations(this.configurations);
return context;
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(CircuitBreaker.class)
@ConditionalOnProperty("feign.circuitbreaker.enabled")
/*xxx: feign的断路器配置 */
protected static class CircuitBreakerPresentFeignTargeterConfiguration {
@Bean
@ConditionalOnMissingBean(CircuitBreakerFactory.class)
/*xxx: 根据是否由断路器工厂,进行选配*/
public Targeter defaultFeignTargeter() {
return new DefaultTargeter();
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean(CircuitBreakerFactory.class)
/*xxx: 根据是否由断路器工厂,进行选配*/
public Targeter circuitBreakerFeignTargeter(CircuitBreakerFactory circuitBreakerFactory) {
return new FeignCircuitBreakerTargeter(circuitBreakerFactory);
}
}
@ConditionalOnClass(ApacheHttpClient.class)
/*xxx: 如果系统已经定义过 CloseableHttpClient,则不会再次配置*/
@ConditionalOnMissingBean(CloseableHttpClient.class)
@ConditionalOnProperty(value = "feign.httpclient.enabled", matchIfMissing = true)
/*xxx: feign的客户端配置 */
protected static class HttpClientFeignConfiguration {
@Bean
@ConditionalOnMissingBean(HttpClientConnectionManager.class)
/*xxx: 连接管理器*/
public HttpClientConnectionManager connectionManager(
ApacheHttpClientConnectionManagerFactory connectionManagerFactory,
FeignHttpClientProperties httpClientProperties) {
final HttpClientConnectionManager connectionManager = connectionManagerFactory.newConnectionManager(
httpClientProperties.isDisableSslValidation(), httpClientProperties.getMaxConnections(),
httpClientProperties.getMaxConnectionsPerRoute(), httpClientProperties.getTimeToLive(),
httpClientProperties.getTimeToLiveUnit(), this.registryBuilder);
this.connectionManagerTimer.schedule(new TimerTask() {
@Override
public void run() {
connectionManager.closeExpiredConnections();
}
}, 30000, httpClientProperties.getConnectionTimerRepeat());
return connectionManager;
}
@Bean
/*xxx: http客户端配置 */
public CloseableHttpClient httpClient(ApacheHttpClientFactory httpClientFactory,
HttpClientConnectionManager httpClientConnectionManager,
FeignHttpClientProperties httpClientProperties) {
RequestConfig defaultRequestConfig = RequestConfig.custom()
.setConnectTimeout(httpClientProperties.getConnectionTimeout())
.setRedirectsEnabled(httpClientProperties.isFollowRedirects()).build();
this.httpClient = httpClientFactory.createBuilder().setConnectionManager(httpClientConnectionManager)
.setDefaultRequestConfig(defaultRequestConfig).build();
return this.httpClient;
}
@Bean
@ConditionalOnMissingBean(Client.class)
/*xxx: feign客户端配置 */
public Client feignClient(HttpClient httpClient) {
return new ApacheHttpClient(httpClient);
}
}
}
@Import({ HttpClientFeignLoadBalancerConfiguration.class, OkHttpFeignLoadBalancerConfiguration.class,
DefaultFeignLoadBalancerConfiguration.class })
/*xxx: feign的负载均衡配置 */
public class FeignLoadBalancerAutoConfiguration {
}
/*xxx: 基于 apacheClient的 feign的负载均衡配置 */
class HttpClientFeignLoadBalancerConfiguration {
@Bean
@ConditionalOnMissingBean
@Conditional(OnRetryNotEnabledCondition.class)
/*xxx: 负载均衡客户端配置 */
public Client feignClient(LoadBalancerClient loadBalancerClient, HttpClient httpClient,
LoadBalancerProperties properties, LoadBalancerClientFactory loadBalancerClientFactory) {
ApacheHttpClient delegate = new ApacheHttpClient(httpClient);
return new FeignBlockingLoadBalancerClient(delegate, loadBalancerClient, properties, loadBalancerClientFactory);
}
}
# 基础设施定义
/*xxx: 注解当前服务为 feign的客户端 */
public @interface FeignClient {
@AliasFor("value")
String name() default "";
//xxx: 指定路径,优先级高于 name
String url() default "";
/*xxx: 指定配置装配时的配置文件*/
Class<?>[] configuration() default {};
/*xxx: 服务降级接口*/
Class<?> fallback() default void.class;
}
@Import(FeignClientsRegistrar.class)
/*xxx: 开启feign客户端处理特性 */
public @interface EnableFeignClients {
/*xxx: 扫描路径 */
String[] basePackages() default {};
/*xxx: 指定具体的client配置接口*/
Class<?>[] clients() default {};
}
/*xxx: feign基础设施的装配 */
class FeignClientsRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
@Override
/*xxx: 注册配置 */
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
/*xxx: 注册默认配置*/
registerDefaultConfiguration(metadata, registry);
/*xxx: 注册 feign客户端 */
registerFeignClients(metadata, registry);
}
private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) {
/*xxx: 通用 feign客户端定义 */
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(FeignClientSpecification.class);
builder.addConstructorArgValue(name);
builder.addConstructorArgValue(configuration);
/*xxx: 注册为模板 */
registry.registerBeanDefinition(name + "." + FeignClientSpecification.class.getSimpleName(),
builder.getBeanDefinition());
}
/*xxx: 注册feign客户端 */
public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
/*xxx: 保存所有预处理的 bean*/
LinkedHashSet<BeanDefinition> candidateComponents = new LinkedHashSet<>();
Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName());
/*xxx: 指定方式 与 扫描方式,二选一*/
final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients");
/*xxx: 没有指定,则进行扫描*/
if (clients == null || clients.length == 0) {
ClassPathScanningCandidateComponentProvider scanner = getScanner();
scanner.setResourceLoader(this.resourceLoader);
/*xxx: 扫描bean容器中,带有 @FeignClient的bean*/
scanner.addIncludeFilter(new AnnotationTypeFilter(FeignClient.class));
Set<String> basePackages = getBasePackages(metadata);
for (String basePackage : basePackages) {
candidateComponents.addAll(scanner.findCandidateComponents(basePackage));
}
}
/*xxx: 遍历预处理的bean */
for (BeanDefinition candidateComponent : candidateComponents) {
/*xxx: 预处理的bean, 需要时 注解定义的 才能被处理. 防止手动注入错误的bean */
if (candidateComponent instanceof AnnotatedBeanDefinition) {
/*xxx: @FeignClient 注解,必须注解在接口上才能够生效,否则 阻断当前进程 */
Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface");
/*xxx: 获取客户端名称 */
String name = getClientName(attributes);
/*xxx: 注册 客户端配置 */
registerClientConfiguration(registry, name, attributes.get("configuration"));
/*xxx: 注册 feign客户端 */
registerFeignClient(registry, annotationMetadata, attributes);
}
}
}
/*xxx: 注册feign客户端 */
private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata,
Map<String, Object> attributes) {
/*xxx: feignClient 工厂代理类,作为 实际的实现, 类似于 mybatis 实现机制 */
FeignClientFactoryBean factoryBean = new FeignClientFactoryBean();
factoryBean.setBeanFactory(beanFactory);
factoryBean.setName(name);
factoryBean.setContextId(contextId);
factoryBean.setType(clazz);
BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(clazz, () -> {
/*xxx: 为代理类设置url,path等属性, 没设置的话, 为空 */
factoryBean.setUrl(getUrl(beanFactory, attributes));
factoryBean.setPath(getPath(beanFactory, attributes));
if (fallback != null) {
/*xxx: 设置 降级接口 */
factoryBean.setFallback(fallback instanceof Class ? (Class<?>) fallback
: ClassUtils.resolveClassName(fallback.toString(), null));
}
/*xxx: 通过工厂获取代理bean*/
return factoryBean.getObject();
});
/*xxx: 将当前的代理类 ,注入bean容器 */
BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, new String[] { alias });
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}
}
/*xxx: feign客户端代理工厂*/
public class FeignClientFactoryBean
implements FactoryBean<Object>, InitializingBean, ApplicationContextAware, BeanFactoryAware {
@Override
/*xxx: 获取实际的bean */
public Object getObject() {
return getTarget();
}
/*xxx: 获取feign客户端代理 */
<T> T getTarget() {
/*xxx: 获取feign上下文 */
FeignContext context = beanFactory != null ? beanFactory.getBean(FeignContext.class)
: applicationContext.getBean(FeignContext.class);
/*xxx: 从feign,上下文中,获取 feign的 builder */
Feign.Builder builder = feign(context);
/*xxx: 如果没有配置 url属性,则对 url进行规整 */
if (!StringUtils.hasText(url)) {
/*xxx: 如果名称不是以 http开头,则为拼接上协议 */
if (!name.startsWith("http")) {
url = "http://" + name;
}
else {
url = name;
}
url += cleanPath();
/*xxx: 通过负载均衡选取服务实例 */
return (T) loadBalance(builder, context, new HardCodedTarget<>(type, name, url));
}
/*xxx: 如果配置了url, 则为url进行规整 */
if (StringUtils.hasText(url) && !url.startsWith("http")) {
url = "http://" + url;
}
String url = this.url + cleanPath();
/*xxx: 获取feign客户端 */
Client client = getOptional(context, Client.class);
Targeter targeter = get(context, Targeter.class);
/*xxx: 通过 targeter进行代理,返回某个接口的代理类 */
return (T) targeter.target(this, builder, context, new HardCodedTarget<>(type, name, url));
}
protected <T> T get(FeignContext context, Class<T> type) {
/*xxx: 从feign上下文中,获取相应的实例 */
T instance = context.getInstance(contextId, type);
if (instance == null) {
throw new IllegalStateException("No bean found of type " + type + " for " + contextId);
}
return instance;
}
}
/*xxx: 目标转换器,用于将Feign客户端接口的方法和注解转换成目标URI和HTTP请求参数等信息*/
public interface Targeter {
/*xxx: 获取动态代理类 */
<T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
Target.HardCodedTarget<T> target);
}
class DefaultTargeter implements Targeter {
@Override
/*xxx: 默认实现 */
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
Target.HardCodedTarget<T> target) {
/*xxx: 本质上通过 feign的builder 获取代理类 */
return feign.target(target);
}
}
class FeignCircuitBreakerTargeter implements Targeter {
@Override
/*xxx: 具有断路器功能的代理获取 */
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
Target.HardCodedTarget<T> target) {
FeignCircuitBreaker.Builder builder = (FeignCircuitBreaker.Builder) feign;
return builder(name, builder).target(target);
}
private FeignCircuitBreaker.Builder builder(String feignClientName, FeignCircuitBreaker.Builder builder) {
return builder.circuitBreakerFactory(this.circuitBreakerFactory).feignClientName(feignClientName);
}
}
/*xxx: 基于feign的断路器 */
public final class FeignCircuitBreaker {
public static Builder builder() {
return new Builder();
}
public static final class Builder extends Feign.Builder {
//xxx: 内容略...
}
}
# Feign负载均衡流程
@Import({ HttpClientFeignLoadBalancerConfiguration.class, OkHttpFeignLoadBalancerConfiguration.class,
DefaultFeignLoadBalancerConfiguration.class })
/*xxx: feign的负载均衡配置 */
public class FeignLoadBalancerAutoConfiguration {
}
@ConditionalOnBean({ LoadBalancerClient.class, LoadBalancerClientFactory.class })
@ConditionalOnProperty(value = "feign.httpclient.enabled", matchIfMissing = true)
@Import(HttpClientFeignConfiguration.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
/*xxx: 基于 apacheClient的 feign的负载均衡配置 */
class HttpClientFeignLoadBalancerConfiguration {
@Bean
@ConditionalOnMissingBean
@Conditional(OnRetryNotEnabledCondition.class)
/*xxx: 负载均衡客户端配置 */
public Client feignClient(LoadBalancerClient loadBalancerClient, HttpClient httpClient,
LoadBalancerProperties properties, LoadBalancerClientFactory loadBalancerClientFactory) {
ApacheHttpClient delegate = new ApacheHttpClient(httpClient);
return new FeignBlockingLoadBalancerClient(delegate, loadBalancerClient, properties, loadBalancerClientFactory);
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
@ConditionalOnBean(LoadBalancedRetryFactory.class)
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", havingValue = "true",
matchIfMissing = true)
/*xxx: 具有负载均衡功能的feign, 它的优先级低于 ribbon, 将优先使用ribbon的负载均衡 */
public Client feignRetryClient(LoadBalancerClient loadBalancerClient, HttpClient httpClient,
LoadBalancedRetryFactory loadBalancedRetryFactory, LoadBalancerProperties properties,
LoadBalancerClientFactory loadBalancerClientFactory) {
ApacheHttpClient delegate = new ApacheHttpClient(httpClient);
return new RetryableFeignBlockingLoadBalancerClient(delegate, loadBalancerClient, loadBalancedRetryFactory,
properties, loadBalancerClientFactory);
}
}
/*xxx: 负载均衡客户端 */
public class RetryableFeignBlockingLoadBalancerClient implements Client {
/*xxx: 负载均衡客户端 */
private final LoadBalancerClient loadBalancerClient;
/*xxx: 负载均衡客户端工厂 */
private final LoadBalancerClientFactory loadBalancerClientFactory;
@Override
/*xxx: feign的负载均衡*/
public Response execute(Request request, Request.Options options) throws IOException {
//通过负载均衡,获取响应.. 略
}
}
# feign断路器流程
需要显示配置属性后生效,属性名称为feign.circuitbreaker.enabled=true
@Configuration(proxyBeanMethods = false)
/*xxx: 默认的feign客户端配置类 */
public class FeignClientsConfiguration {
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(CircuitBreaker.class)
@ConditionalOnProperty("feign.circuitbreaker.enabled")
/*xxx: feign的断路器配置,需要主动配置 */
protected static class CircuitBreakerPresentFeignBuilderConfiguration {
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
@ConditionalOnBean(CircuitBreakerFactory.class)
/*xxx: 带有断路器工厂的feign*/
public Feign.Builder circuitBreakerFeignBuilder() {
return FeignCircuitBreaker.builder();
}
}
}
/*xxx: 基于feign的断路器 */
public final class FeignCircuitBreaker {
/*xxx: 断路器工厂 */
private CircuitBreakerFactory circuitBreakerFactory;
/*xxx: 带有断路器功能的feign*/
public static final class Builder extends Feign.Builder {
@Override
/*xxx: 构建目标*/
public <T> T target(Target<T> target) {
return build(null).newInstance(target);
}
/*xxx: 将断路器功能,包装为一个 拦截器,实现基于断路器功能, 断路器的实现,有具体的组件实现*/
/*xxx: 传入 fallback工厂,用于后续的操作 */
public Feign build(final FallbackFactory<?> nullableFallbackFactory) {
super.invocationHandlerFactory((target, dispatch) -> new FeignCircuitBreakerInvocationHandler(
circuitBreakerFactory, feignClientName, target, dispatch, nullableFallbackFactory));
return super.build();
}
}
}
/*xxx 断路器代理 */
class FeignCircuitBreakerInvocationHandler implements InvocationHandler {
/*xxx: 断路器工厂 */
private final CircuitBreakerFactory factory;
@Override
/*xxx: 执行代理 */
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
/*xxx: 短路的名称 */
String circuitName = this.feignClientName + "_" + method.getName();
/*xxx: 通过 短路名称, 创建断路器 */
CircuitBreaker circuitBreaker = this.factory.create(circuitName);
/*xxx: 执行断路器, 正常的执行逻辑,也会经过断路器 */
return circuitBreaker.run(supplier);
}
}
# 生效条件
- 存在feign的环境,自动生效;
# 服务熔断
# 原理
# 自动配置定义
@ConditionalOnProperty(name = { "spring.cloud.circuitbreaker.resilience4j.enabled",
"spring.cloud.circuitbreaker.resilience4j.blocking.enabled" }, matchIfMissing = true)
/*xxx: resilience4j自动配置*/
public class Resilience4JAutoConfiguration {
@Bean
@ConditionalOnMissingBean(CircuitBreakerFactory.class)
/*xxx: 断路器工厂 */
public Resilience4JCircuitBreakerFactory resilience4jCircuitBreakerFactory(
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
@Autowired(required = false) Resilience4jBulkheadProvider bulkheadProvider) {
/*xxx: 依赖断路器注册表,resilience本身会装载*/
Resilience4JCircuitBreakerFactory factory = new Resilience4JCircuitBreakerFactory(circuitBreakerRegistry,
timeLimiterRegistry, bulkheadProvider);
customizers.forEach(customizer -> customizer.customize(factory));
return factory;
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Bulkhead.class)
@ConditionalOnProperty(value = "spring.cloud.circuitbreaker.bulkhead.resilience4j.enabled", matchIfMissing = true)
/*xxx: 断路器 bulkhead 配置 */
public static class Resilience4jBulkheadConfiguration {
@Autowired(required = false)
private List<Customizer<Resilience4jBulkheadProvider>> bulkheadCustomizers = new ArrayList<>();
@Bean
public Resilience4jBulkheadProvider bulkheadProvider(ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry,
BulkheadRegistry bulkheadRegistry) {
Resilience4jBulkheadProvider resilience4jBulkheadProvider = new Resilience4jBulkheadProvider(
threadPoolBulkheadRegistry, bulkheadRegistry);
bulkheadCustomizers.forEach(customizer -> customizer.customize(resilience4jBulkheadProvider));
return resilience4jBulkheadProvider;
}
}
}
# 基础设施定义
- 断路器
/*xxx: 断路器 */
public interface CircuitBreaker {
default <T> T run(Supplier<T> toRun) {
return run(toRun, throwable -> {
throw new NoFallbackAvailableException("No fallback available.", throwable);
});
};
/*xxx: 回调接口 */
<T> T run(Supplier<T> toRun, Function<Throwable, T> fallback);
}
- 断路器工厂
/*xxx: 断路器工厂*/
public abstract class CircuitBreakerFactory<CONF, CONFB extends ConfigBuilder<CONF>>
extends AbstractCircuitBreakerFactory<CONF, CONFB> {
/*xxx: 生成断路器*/
public abstract CircuitBreaker create(String id);
}
/*xxx: 断路器工厂 */
public abstract class AbstractCircuitBreakerFactory<CONF, CONFB extends ConfigBuilder<CONF>> {
/*xxx: 断路器配置 */
private final ConcurrentHashMap<String, CONF> configurations = new ConcurrentHashMap<>();
}
# resilience4j断路器
# 纯净版使用
- 定义断路器配置
class Test{
public void test(){
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofMillis(1000))
.ringBufferSizeInHalfOpenState(2)
.ringBufferSizeInClosedState(2)
.build();
}
}
- 创建断路器
class Test{
public void test(){
CircuitBreaker circuitBreaker = CircuitBreaker.of("myCircuitBreaker", circuitBreakerConfig);
}
}
- 使用断路器包装容错的方法
class Test{
public void test(){
String result = CircuitBreaker.decorateFunction(circuitBreaker, () -> myService.doSomething());
}
}
# 原理
/*xxx: resilience4j 实现的 断路器 */
public class Resilience4JCircuitBreaker implements CircuitBreaker {
/*xxx: 用来实现服务降级*/
public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {
/*xxx: 提交异步线程执行, toRun代表实际要执行的方法*/
Supplier<Future<T>> futureSupplier = () -> executorService.submit(toRun::get);
/*xxx: 从断路器注册表中,获取断路器*/
io.github.resilience4j.circuitbreaker.CircuitBreaker defaultCircuitBreaker = registry.circuitBreaker(id,
circuitBreakerConfig);
circuitBreakerCustomizer.ifPresent(customizer -> customizer.customize(defaultCircuitBreaker));
/*xxx: 本质上,都是通过 装饰器模式,对原有的方法进行增强*/
if (bulkheadProvider != null) {
/*xxx: 实际是该方法执行的*/
return bulkheadProvider.run(id, toRun, fallback, defaultCircuitBreaker, timeLimiter);
}
else {
/*xxx: 执行断路器逻辑,执行失败,或者 执行超时,则通过调用回调接口,实现熔断效果*/
Callable<T> callable = io.github.resilience4j.circuitbreaker.CircuitBreaker
.decorateCallable(defaultCircuitBreaker, restrictedCall);
return Try.of(callable::call).recover(fallback).get();
}
}
}
# 生效条件
- 通过服务调用环境的代理实现
- 有组件实现的环境自动生效,同时支持配置进行关闭;
# 常用组件
- openFeign为例
@Configuration(proxyBeanMethods = false)
/*xxx: 默认的feign客户端配置类 */
public class FeignClientsConfiguration {
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(CircuitBreaker.class)
@ConditionalOnProperty("feign.circuitbreaker.enabled")
/*xxx: feign的断路器配置,需要主动配置 */
protected static class CircuitBreakerPresentFeignBuilderConfiguration {
@Bean
@Scope("prototype")
@ConditionalOnMissingBean({ Feign.Builder.class, CircuitBreakerFactory.class })
public Feign.Builder defaultFeignBuilder(Retryer retryer) {
return Feign.builder().retryer(retryer);
}
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
@ConditionalOnBean(CircuitBreakerFactory.class)
/*xxx: 带有断路器工厂的feign*/
public Feign.Builder circuitBreakerFeignBuilder() {
return FeignCircuitBreaker.builder();
}
}
}
/*xxx: 基于feign的断路器 */
public final class FeignCircuitBreaker {
/*xxx: 带有断路器功能的feign*/
public static final class Builder extends Feign.Builder {
@Override
/*xxx: 构建目标*/
public <T> T target(Target<T> target) {
return build(null).newInstance(target);
}
/*xxx: 将断路器功能,包装为一个 拦截器,实现基于断路器功能, 断路器的实现,有具体的组件实现*/
public Feign build(final FallbackFactory<?> nullableFallbackFactory) {
super.invocationHandlerFactory((target, dispatch) -> new FeignCircuitBreakerInvocationHandler(
circuitBreakerFactory, feignClientName, target, dispatch, nullableFallbackFactory));
return super.build();
}
}
}
# 微服务网关
由于设计的背景知识太多,详见springWebflux章节