# 响应式编程

# java的响应式发展历程

  • java1.1中,引入了ObserverObservable接口,提供了一种基本的观察者模式实现方式
  • java5中,引入了并发包java.util.concurrent,其中包括了FutureExecutor接口等,使得java程序可以更好地处理异步任务
  • java8中,引入了Stream API,提供了一种基于函数式编程的流式处理,使得数据处理更加高效和简洁
  • java9中,引入了Flow API,提供了一种基于反应流的编程方式,使得程序可以更加响应式地处理数据流
  • java10中,引入了Var关键字,可以更加简洁地处理数据类型,从而更好的支持函数式编程
  • java11中,引入了HttpClient API,提供了一种基于异步IOhttp客户端实现,可以更好的处理网络请求;

# reactor响应式编程

  • Reactor 1.x,发布于2013年,基于java6,提供基础的响应式编程功能,提供了FluxMono来处理数据流
  • Reactor 2.x,发布于2014年,基于java7,内部集成了spring框架
  • Reactor 3.x,发布于2016年,基于java8,引入了函数式编程的思想,同时支持异步IO和反应流编程,能够更好的处理高负载和高并发;
  • Reactor 3.3,发布于2018年,实现了对java9的支持,包括支持java9的Flow API;
  • Reactor 3.4,发布于2020年,实现了对java11的支持,同时增加了对Kotlin和Scala等语言的支持;

# reactor源码解读

# 响应式流规范

/*xxx: 响应式编程接口, 发布者*/
    /*xxx: 表示一种能够产生数据流的组件*/
public interface Publisher<T> {
 	/*xxx: 订阅*/
    public void subscribe(Subscriber<? super T> s);
}
/*xxx: 响应式编程接口 订阅者*/
    /*xxx: 表示一种能够响应数据流中的数据并进行相应处理的组件*/
public interface Subscriber<T> {
 	/*xxx: 订阅回调 */
    public void onSubscribe(Subscription s);
    
    /*xxx: 事件下一步处理*/
    public void onNext(T t);
    
    /*xxx: 出错回调 */
    public void onError(Throwable t);
    
    public void onComplete();
}
/*xxx: 订阅控制*/
    /*xxx: 表示一种能够控制数据流的组件*/
public interface Subscription {
 	/*xxx: 触发流处理 */
    /*xxx: 请求Publisher发送指定数量的元素, Publisher会根据请求的数量发送元素, 知道满足请求或者数据流被处理完成*/
    public void request(long n);
    
    /*xxx: 订阅者在处理完所有的元素后,应该调用该方法通知Publisher取消订阅,以避免资源浪费和内存泄漏*/
    public void cancel();
}
/*xxx: 处理器,既是消费者,也是订阅者*/
    /*xxx: 表示一种能够接收输入数据,并产出输出数据的组件*/
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

# flux响应式编程规范

/*xxx: reactor 发布者*/
public interface CorePublisher<T> extends Publisher<T> {
 	/*xxx: 订阅*/
	void subscribe(CoreSubscriber<? super T> subscriber);   
}
/*xxx: 订阅者*/
public interface CoreSubscriber<T> extends Subscriber<T> {
 	 @Override
	void onSubscribe(Subscription s);  
}
/*xxx: 多元素反应堆 接口*/
public abstract class Flux<T> implements CorePublisher<T> {
 	/*xxx: 构造反应堆流实例*/
	public static <T> Flux<T> just(T data) {
		return onAssembly(new FluxJust<>(data));
	}
    
    /*xxx: 派发事件 */
	public final Flux<T> publishOn(Scheduler scheduler, int prefetch) {
		return publishOn(scheduler, true, prefetch);
	}

    /*xxx: 派发事件 */
	final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch, int lowTide) {
     	/*xxx: 派发事件 */
		return onAssembly(new FluxPublishOn<>(this, scheduler, delayError, prefetch, lowTide, Queues.get(prefetch))); 
    }
    
    /*xxx: 通过消费者进行订阅*/
	public final Disposable subscribe(Consumer<? super T> consumer) {
		Objects.requireNonNull(consumer, "consumer");
		return subscribe(consumer, null, null);
	}
}
/*xxx:发送单个事件之后即完成的 反应堆流*/
final class FluxJust<T> extends Flux<T>
		implements Fuseable.ScalarCallable<T>, Fuseable,SourceProducer<T> {

        /*xxx: 订阅事件*/
	public void subscribe(final CoreSubscriber<? super T> actual) {
		/*xxx: scala订阅控制 */
		actual.onSubscribe(Operators.scalarSubscription(actual, value, "just"));
	}
    
}
/*xxx: 发送定额数组事件 的反应堆流*/
final class FluxArray<T> extends Flux<T> implements Fuseable, SourceProducer<T> {
 	
    public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) {
     	/*xxx: 数组订阅控制*/
		s.onSubscribe(new ArraySubscription<>(s, array));   
    }
    
    /*xxx: 订阅事件 */
	public void subscribe(CoreSubscriber<? super T> actual) {
		subscribe(actual, array);
	}
    
    
    /*xxx: 数组订阅控制 */
	static final class ArraySubscription<T>
			implements InnerProducer<T>, SynchronousSubscription<T> {
     	
        @Override /*xxx: 进行订阅请求 */
		public void request(long n) {
			if (Operators.validate(n)) {
				if (Operators.addCap(REQUESTED, this, n) == 0) {
					if (n == Long.MAX_VALUE) {
						fastPath();
					}
					else {
						slowPath(n);
					}
				}
			}
		}
        
        void fastPath() {
			final T[] a = array;
			final int len = a.length;
			final Subscriber<? super T> s = actual;

			for (int i = index; i != len; i++) {
				if (cancelled) {
					return;
				}

				T t = a[i];

				if (t == null) {
					s.onError(new NullPointerException("The " + i + "th array element was null"));
					return;
				}
				/*xxx: 调用 onNext回调*/
				s.onNext(t);
			}
			if (cancelled) {
				return;
			} /*xxx: 调用流处理完成的回调*/
			s.onComplete();
		}
        
    }
}
public class FluxExample {
    public static void main(String[] args) {
        //xxx: 除了消费者订阅外,还可以通过订阅者订阅
        /*xxx: 消费者,后续也是通过被包装为订阅者实现的 */
        Flux.just("hello","world").subscribe((t)->{

            System.out.println("consumer callback");
            System.out.println(t);
        });

        Flux.just("hi","reactor").subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("complete");
            }
        });
    }
}

# mono响应式编程规范

public abstract class Mono<T> implements CorePublisher<T> {
 	   //xxx: 略...
}
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {
 	public final void onSubscribe(Subscription s) {
		if (Operators.validate(subscription, s)) {
			this.subscription = s;

			if (subscriptionConsumer != null) {
				try {
					subscriptionConsumer.accept(s);
				}
				catch (Throwable t) {
					Exceptions.throwIfFatal(t);
					s.cancel();
					onError(t);
				}
			}
			else { 	/*xxx: 订阅回调触发 request,触发 流操作*/
				s.request(Long.MAX_VALUE);
			}

		}
	}
    
    @Override /*xxx: 流操作回调*/
	public final void onNext(T x) {
     	/*xxx: 消费者处理*/
		consumer.accept(x);   
    }
   
}
public class MonoExample {
    public static void main(String[] args) {
        Mono.just("hello").subscribe((s)->{
            System.out.println(s);
        });

        Mono.just("hello").subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("complete");
            }
        });
    }
}

# reactor原理解读(个人理解)

  • 需要处理的数据,抽象为事件
  • 响应式编程中,发布者用于产生事件源,由Publisher接口定义,其中,Flux与Mono是该接口的两种实现;
  • 订阅者可以订阅事件,并进行流式消费,订阅者订阅事件,是通过与发布者交互完成的
  • 订阅者一旦订阅完成,即开始处理事件,处理事件是通过Subscription进行控制的,主要体现在request方法
  • request方法执行后,会调用订阅者的onNext回调接口,该接口用于消费事件,并且可以将处理的结果传递给下游订阅者继续处理;

# 命名由来

  • Flux与Mono响应式编程模型,来源于化学中的反应堆模型
  • Mono源于单个反应堆的概念,因此Mono代表单个元素的流
  • Flux源于连续反应堆的概念,因此Flux代表多个元素的流
  • 化学中,反应堆是用来进行化学反应的设备或容器;

# 个人解读

  • 编程中,从以前的同步思路,转换为异步思路,可以形象理解为方法到这了,但是不一定立即执行,即有需要才执行

  • 通过事件机制,使得跨线程沟通协作变得简单,这是其尤为重要的特性

# Netty响应式

# Netty官方源码整合

/*xxx: 传输层,用于处理网络传输层的相关逻辑, 包括TCP和UDP协议的编解码,数据读写,连接管理 */
public abstract class Transport<T extends Transport<T, C>, C extends TransportConfig> {
 	//xxx: 略...   
}
public abstract class ServerTransport<T extends  ServerTransport<T, CONF>,CONF extends ServerTransportConfig<CONF>> extends Transport<T, CONF> {
     
    /*xxx: 绑定地址 */
	public Mono<? extends DisposableServer> bind() {
     	/*xxx: 创建Mono反应堆 */
		Mono<? extends DisposableServer> mono =  Mono.create(sink -> {
            
            /*xxx: 处理连接请求,传入了 信道处理器参数 */
			Acceptor acceptor = new Acceptor(config.childEventLoopGroup(), config.channelInitializer(childObs, null, true),
					config.childOptions, config.childAttrs, isDomainSocket);
            
            /*xxx: 通过sink参数,贯穿控制 */
			if (local instanceof DomainSocketAddress) {
				isDomainSocket = true;
				disposableServer = new UdsDisposableBind(sink, config, local);
			}
			else {
				disposableServer = new InetDisposableBind(sink, config, local);
			}
            
            /*xxx: 订阅并启动流*/
			TransportConnector.bind(config, new AcceptorInitializer(acceptor), local, isDomainSocket).subscribe(disposableServer);
            
        };
                                                             if (config.doOnBind() != null) {
			/*xxx: 绑定成功后,订阅并启动流*/
			mono = mono.doOnSubscribe(s -> config.doOnBind().accept(config));
		}
                                                             return mono;
    }
                                                             
                                                        /*xxx: 绑定监听*/
	public final DisposableServer bindNow() {
		return bindNow(Duration.ofSeconds(45));
	}
                                                             
                                                         /*xxx:处理连接请求  */
	static class Acceptor extends ChannelInboundHandlerAdapter {
     	   
        /*xxx: 信道读取事件 */
		public void channelRead(ChannelHandlerContext ctx, Object msg) {
         	final Channel child = (Channel) msg;

			/*xxx: 添加信道处理器 */
			child.pipeline().addLast(childHandler);
            
            childGroup.register(child).addListener((ChannelFutureListener) future -> {
					if (!future.isSuccess()) {
						forceClose(child, future.cause());
					}
				});
        }
    }
}
/*xxx: httpserver 建造器 */
public abstract class HttpServer extends ServerTransport<HttpServer, HttpServerConfig> {
 	/*xxx: 创建服务器实例*/
	public static HttpServer create() {
		return HttpServerBind.INSTANCE;
	}
    
    /*xxx: 添加 事件处理器 */
	public final HttpServer handle(
			BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		Objects.requireNonNull(handler, "handler");
		return childObserve(new HttpServerHandle(handler));
	}
    
    /*xxx: 添加路由处理 */
	public final HttpServer route(Consumer<? super HttpServerRoutes> routesBuilder) {
		Objects.requireNonNull(routesBuilder, "routeBuilder");
		HttpServerRoutes routes = HttpServerRoutes.newRoutes();
		/*xxx: 进行消费  */
		routesBuilder.accept(routes);
		/*xxx: 路由最终,仍然是通过 处理器进行处理的 */
		return handle(routes);
	}
    
    /*xxx: httpServer事件处理器 */
	static final class HttpServerHandle implements ConnectionObserver {
        
        final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler;
        
     	/*xxx: 状态变更回调 */
		public void onStateChange(Connection connection, State newState) {
         	/*xxx: 处理器进行处理 */
			Mono<Void> mono = Mono.fromDirect(handler.apply(ops, ops)); 
            
            /*xxx: 订阅并启动流处理 */
            mono.subscribe(ops.disposeSubscriber());
        }
    }
}
@FunctionalInterface
/*xxx: 连接观察器 */
public interface ConnectionObserver {
 	/*xxx: 状态变更回调 */
	void onStateChange(Connection connection, State newState);   
}

# 应用级容器基础设施-路由

  • 其本质上,与servlet处于一个抽象层次
/*xxx: 路由接口*/
public interface HttpServerRoutes extends  BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> {
 	   /*xxx: 实例化路由定义 */
	static HttpServerRoutes newRoutes() {
		return new DefaultHttpServerRoutes();
	}
}
final class DefaultHttpServerRoutes implements HttpServerRoutes {
 	/*xxx: 配置路由 */
	public HttpServerRoutes route(Predicate<? super HttpServerRequest> condition,
			BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {

		if (condition instanceof HttpPredicate) {
			handlers.add(new HttpRouteHandler(condition,
					handler,
					(HttpPredicate) condition));
		}
		else {
			handlers.add(new HttpRouteHandler(condition, handler, null));
		}
		return this;
	}
    
    /*xxx: 匹配路由,并执行*/
    public Publisher<Void> apply(HttpServerRequest request, HttpServerResponse response) {
     	   final Iterator<HttpRouteHandler> iterator = handlers.iterator();
           HttpRouteHandler cursor;
           while (iterator.hasNext()) {
				cursor = iterator.next();
				if (cursor.test(request)) {
					return cursor.apply(request, response);
				}
			}
    }
    
    /*xxx: 路由处理器 */
	static final class HttpRouteHandler implements BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>,
       Predicate<HttpServerRequest> {
         	final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>>
		                                                    handler;
           
           public Publisher<Void> apply(HttpServerRequest request, HttpServerResponse response) {
				return handler.apply(request.paramsResolver(resolver), response);
			}
           	
      }
}

# netty响应式容器使用示例

public class NettyReactorExample {
    public static void main(String[] args) throws InterruptedException {

        HttpServer.create()
                .host("0.0.0.0")
                .port(8077)
                .route(routes->{
                    routes.get("/hello",
                            (req, res) -> res.header("ContentType", "text/plain;")
                                    .sendString(Mono.just("Hello World!")));

                    routes.get("/test",
                            (req, res) -> res.header("ContentType", "text/plain;")
                                    .sendString(Mono.just("this is Test...")));
                })
                .bind().block();

        Thread thread =new Thread(()->{
            while (true){
                //doSomeThing
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread.run();

    }
}

# SpringBoot的集成容器

# 上下文体系

/*xxx: 基于响应式的服务上下文*/
public class GenericReactiveWebApplicationContext extends GenericApplicationContext
		implements ConfigurableReactiveWebApplicationContext {
 	//略...   
}
/*xxx: 基于响应式的服务上下文*/
public class ReactiveWebServerApplicationContext extends GenericReactiveWebApplicationContext
		implements ConfigurableWebServerApplicationContext {
    
    /*xxx: 服务容器管理器 */
	private volatile WebServerManager serverManager;

	/*xxx: 服务命名空间 */
	private String serverNamespace;
    
 	protected void onRefresh() {
     	/*xxx: 创建服务容器*/
		createWebServer();   
    }
    
    private void createWebServer() {
     	   /*xxx:检查 服务容器管理器 */
		WebServerManager serverManager = this.serverManager;
		if (serverManager == null) {
         	/*xxx: 获取服务容器工厂*/
			ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);
            
            /*xxx: 初始化 服务容器管理器*/
			/*xxx: 服务容器的生命周期,由框架托管*/
			this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);
            
            /*xxx: 为服务容器,注入生命周期管理*/
			getBeanFactory().registerSingleton("webServerStartStop",
					new WebServerStartStopLifecycle(this.serverManager));
        }
    }
}
/*xxx: 服务容器管理器 */
class WebServerManager {
    
    private final WebServer webServer;
    
	void start() {
		this.handler.initializeHandler();
		/*xxx: 启动服务容器 */
		this.webServer.start();
		/*xxx: 服务容器启动后, 发布事件通知*/
		this.applicationContext
				.publishEvent(new ReactiveWebServerInitializedEvent(this.webServer, this.applicationContext));
	}    
}
class WebServerStartStopLifecycle implements SmartLifecycle {
 	/*xxx: 生命周期由spring上下文托管*/
	public void start() {
		/*xxx: 启动响应式服务容器 */
		this.weServerManager.start();
		this.running = true;
	}
    
    public void stop() {
		this.running = false;
		/*xxx: 停止响应式服务容器*/
		this.weServerManager.stop();
	}
}

# 响应式容器体系

/*xxx: 响应式服务容器工厂 */
public interface ReactiveWebServerFactory {
    
 	/*xxx: 获取服务器容器, 需要指定 httpHandler */
	WebServer getWebServer(HttpHandler httpHandler);
}
/*xxx: 基于 响应式的 服务器容器工厂 */
public abstract class AbstractReactiveWebServerFactory extends AbstractConfigurableWebServerFactory
		implements ConfigurableReactiveWebServerFactory {
	//略...
}
/*xxx: 基于netty实现的响应式服务器容器 */
public class NettyReactiveWebServerFactory extends AbstractReactiveWebServerFactory {
    
 	public WebServer getWebServer(HttpHandler httpHandler) {
		/*xxx: 创建服务器容器*/
		HttpServer httpServer = createHttpServer();
		/*xxx: 基于reactor 响应式适配器, 本质上遵循 reactor规范 */
		ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
		/*xxx: 创建netty服务容器  */
		NettyWebServer webServer = createNettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout,
				getShutdown());
		webServer.setRouteProviders(this.routeProviders);
		return webServer;
	}   
    
    /*xxx: 创建netty服务器*/
	NettyWebServer createNettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter,
			Duration lifecycleTimeout, Shutdown shutdown) {
		/*xxx: 创建netty服务器 */
		return new NettyWebServer(httpServer, handlerAdapter, lifecycleTimeout, shutdown);
	}
    
    private HttpServer createHttpServer() {
		/*xxx: 创建http服务器容器 */
		HttpServer server = HttpServer.create();
        
     	/*xxx: 监听地址*/
		server = server.bindAddress(this::getListenAddress);   
        
        /*xxx: 定义服务容器使用的协议 */
		server = server.protocol(listProtocols()).forwarded(this.useForwardHeaders);
		return applyCustomizers(server);
    }
}
/*xxx: 基于netty实现的服务容器 */
public class NettyWebServer implements WebServer {
 	
    private final HttpServer httpServer;
    
    private volatile DisposableServer disposableServer;
    
    /*xxx: 路由*/
	private List<NettyRouteProvider> routeProviders = Collections.emptyList();
    
    public NettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter, Duration lifecycleTimeout,
			Shutdown shutdown) {

		/*xxx: 请求处理适配器*/
		this.handler = handlerAdapter;
		/*xxx: 通过httpServer开启的信道,作为实际的服务处理器*/
		this.httpServer = httpServer.channelGroup(new DefaultChannelGroup(new DefaultEventExecutor()));
	}
    
    /*xxx: 启动服务容器 */
	public void start() throws WebServerException {
     	this.disposableServer = startHttpServer();   
    }
    
    /*xxx: 启动http服务器*/
	DisposableServer startHttpServer() {
		HttpServer server = this.httpServer;
		if (this.routeProviders.isEmpty()) {
			/*xxx: 为 http服务添加处理器 */
			server = server.handle(this.handler);
		}
		else {
			/*xxx: 为 http服务添加路由规则*/
			server = server.route(this::applyRouteProviders);
		}
		
		return server.bindNow();
	}
    
}

# webFlux响应式编程

# 响应式编程基础设施

# HttpHandler

/*xxx: reactive容器的组件定义 */
public interface HttpHandler {

    Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
}
/*xxx: reactive容器的组件定义 */
public interface ServerWebExchange {
 	
    ServerHttpRequest getRequest();
    
    ServerHttpResponse getResponse();
    
    Mono<WebSession> getSession();
    
    ApplicationContext getApplicationContext();
    
}

# WebHandler

public interface WebHandler {
 	Mono<Void> handle(ServerWebExchange exchange);   
}

# HttpWebHandlerAdapter

public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
    
 	public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
		if (this.forwardedHeaderTransformer != null) {
			request = this.forwardedHeaderTransformer.apply(request);
		}
		ServerWebExchange exchange = createExchange(request, response);

		return getDelegate().handle(exchange)
				.doOnSuccess(aVoid -> logResponse(exchange))
				.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
				.then(Mono.defer(response::setComplete));
	}   
}
public class WebHandlerDecorator implements WebHandler {
 	private final WebHandler delegate;
    
    public Mono<Void> handle(ServerWebExchange exchange) {
		return this.delegate.handle(exchange);
	}
}

# 响应式编程配置及原理

# DispatcherHandler(类比DispatcherServlet)

public class DispatcherHandler implements WebHandler, ApplicationContextAware {
 	@Nullable
	/*xxx: handlerMapping定义*/
	private List<HandlerMapping> handlerMappings;

	@Nullable
	/*xxx: handlerAdapter定义*/
	private List<HandlerAdapter> handlerAdapters;

	@Nullable
	/*xxx: 结果处理器定义 */
	private List<HandlerResultHandler> resultHandlers;
    
    @Override
	public Mono<Void> handle(ServerWebExchange exchange) {
		if (this.handlerMappings == null) {
			return createNotFoundError();
		}
		return Flux.fromIterable(this.handlerMappings)
				.concatMap(mapping -> mapping.getHandler(exchange))
				.next()
				.switchIfEmpty(createNotFoundError())
				.flatMap(handler -> invokeHandler(exchange, handler))
				.flatMap(result -> handleResult(exchange, result));
	}
    
}

# HandlerMapping

/*xxx: 基于webflux的 handlerMapping*/
public interface HandlerMapping {
 	/*xxx: 获取handler处理器 */
	Mono<Object> getHandler(ServerWebExchange exchange);   
}
public abstract class AbstractHandlerMapping extends ApplicationObjectSupport
		implements HandlerMapping, Ordered, BeanNameAware {
 	//xxx: RequestMapping 默认使用该值作为顺序
	private int order = Ordered.LOWEST_PRECEDENCE; 
    
    public Mono<Object> getHandler(ServerWebExchange exchange) {
     	return getHandlerInternal(exchange).map(handler -> {
           	return handler; 
        });
    }
    
    /*xxx: 获取处理器 */
	protected abstract Mono<?> getHandlerInternal(ServerWebExchange exchange);
}
  • 基于RouterFunction
public class RouterFunctionMapping extends AbstractHandlerMapping implements InitializingBean {
 	/*xxx: 路由注册表 */
	private RouterFunction<?> routerFunction;
    
    public void afterPropertiesSet() throws Exception {
     	/*xxx: 初始化路由注册表 */
		initRouterFunctions();   
    }
    
    protected void initRouterFunctions() {
		List<RouterFunction<?>> routerFunctions = routerFunctions();
		this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null);
	}
    
    private List<RouterFunction<?>> routerFunctions() {
		/*xxx: 从上下文中,获取 RouterFunction的bean*/
		List<RouterFunction<?>> functions = obtainApplicationContext()
				.getBeanProvider(RouterFunction.class)
				.orderedStream()
				.map(router -> (RouterFunction<?>)router)
				.collect(Collectors.toList());
		return (!CollectionUtils.isEmpty(functions) ? functions : Collections.emptyList());
	}
    
    @Override
	protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
		/*xxx: 基于 routerFunction 的处理器映射  */
		if (this.routerFunction != null) {
			ServerRequest request = ServerRequest.create(exchange, this.messageReaders);
			/*xxx: 路由映射,返回HandlerFunction */
			return this.routerFunction.route(request)
					.doOnNext(handler -> setAttributes(exchange.getAttributes(), request, handler));
		}
		else {
			return Mono.empty();
		}
	}
}
  • 基于RequestMapping
public abstract class AbstractHandlerMethodMapping<T> extends AbstractHandlerMapping implements InitializingBean {
    
    
    
 	/*xxx: 注册侧映射映射*/
	public void registerMapping(T mapping, Object handler, Method method) {
		this.mappingRegistry.register(mapping, handler, method);
	}   
    
    public void afterPropertiesSet() {

		/*xxx: 初始化requestMapping注册表 */
		initHandlerMethods();
    }
    
    public Mono<HandlerMethod> getHandlerInternal(ServerWebExchange exchange) {
		this.mappingRegistry.acquireReadLock();
		try {
			HandlerMethod handlerMethod;
			try {
				handlerMethod = lookupHandlerMethod(exchange);
			}
			catch (Exception ex) {
				return Mono.error(ex);
			}
			if (handlerMethod != null) {
				handlerMethod = handlerMethod.createWithResolvedBean();
			}
			return Mono.justOrEmpty(handlerMethod);
		}
		finally {
			this.mappingRegistry.releaseReadLock();
		}
	}
    
}
/*xxx: 基于 requestMapping的处理器映射 */
public abstract class RequestMappingInfoHandlerMapping extends AbstractHandlerMethodMapping<RequestMappingInfo> {
 	
    //略...
}
  • 基于RouterPredication(微服务网关)
/*xxx: routePredicate 的handlerMapping,基于webflux风格, 是网关的核心与webflux的接入点 */
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
 	/*xxx: handler处理器, webHandler实例 , 由 RouterAdapter适配器进行处理 ,也就是 HandlerFunctionAdapter*/
	private final FilteringWebHandler webHandler;
    
    /*xxx: 路由定位器 */
	private final RouteLocator routeLocator;
    
    @Override
	protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
        
        /*xxx: 查找路由, 返回 FilteringWebHandler */
		return lookupRoute(exchange)
				// .log("route-predicate-handler-mapping", Level.FINER) //name this
				.flatMap((Function<Route, Mono<?>>) r -> {
					exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
					if (logger.isDebugEnabled()) {
						logger.debug(
								"Mapping [" + getExchangeDesc(exchange) + "] to " + r);
					}

					exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
					return Mono.just(webHandler);
				}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
					exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
					if (logger.isTraceEnabled()) {
						logger.trace("No RouteDefinition found for ["
								+ getExchangeDesc(exchange) + "]");
					}
				})));
    }	
}

# HandlerAdapter

/*xxx: reactive风格的 handlerAdapter*/
public interface HandlerAdapter {
 	boolean supports(Object handler);
    
    Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler);
}
  • 基于HandlerFunction
/*xxx:reactive 基于路由器的 适配器*/
public class HandlerFunctionAdapter implements HandlerAdapter {
    
 	@Override
	/*xxx: 支持处理路由器返回的 handlerFunction*/
	public boolean supports(Object handler) {
		return handler instanceof HandlerFunction;
	}
    
    @Override
	public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
		HandlerFunction<?> handlerFunction = (HandlerFunction<?>) handler;
		ServerRequest request = exchange.getRequiredAttribute(RouterFunctions.REQUEST_ATTRIBUTE);
		/*xxx: 通过调用 handlerFunction ,并将结果包装成 handlerResult*/
		return handlerFunction.handle(request)
				.map(response -> new HandlerResult(handlerFunction, response, HANDLER_FUNCTION_RETURN_TYPE));
	}
    
}
  • 基于RequestMapping
/*xxx: reactive风格 基于 requestMapping 的 适配器  */
public class RequestMappingHandlerAdapter implements HandlerAdapter, ApplicationContextAware, InitializingBean {
 	@Override
	public boolean supports(Object handler) {
		return handler instanceof HandlerMethod;
	}
    
    @Override
	public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
     	HandlerMethod handlerMethod = (HandlerMethod) handler;
        
        InvocableHandlerMethod invocableMethod = this.methodResolver.getRequestMappingMethod(handlerMethod);
        
        return this.modelInitializer
				.initModel(handlerMethod, bindingContext, exchange)
				.then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext)))
				.doOnNext(result -> result.setExceptionHandler(exceptionHandler))
				.doOnNext(result -> bindingContext.saveModel())
				.onErrorResume(exceptionHandler);
    }
}
  • 基于WebHandler
public class SimpleHandlerAdapter implements HandlerAdapter {

	@Override
	public boolean supports(Object handler) {
		return WebHandler.class.isAssignableFrom(handler.getClass());
	}

	@Override
	public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
		WebHandler webHandler = (WebHandler) handler;
		Mono<Void> mono = webHandler.handle(exchange);
		return mono.then(Mono.empty());
	}

}

# 响应式编程自动配置

# WebFlux配置器

/*xxx: webflux自动配置*/
public class WebFluxAutoConfiguration {
 	   
    @Import({ EnableWebFluxConfiguration.class })
	/*xxx: webflux配置器 */
	public static class WebFluxConfig implements WebFluxConfigurer {
     	/*xxx: 添加资源处理器 */
		public void addResourceHandlers(ResourceHandlerRegistry registry) {
        	//略...
        }
        
        /*xxx: 配置视图解析器 */
		public void configureViewResolvers(ViewResolverRegistry registry) {
			this.viewResolvers.orderedStream().forEach(registry::viewResolver);
		}
    }
}

# WebFlux配置

/*xxx: webflux自动配置*/
public class WebFluxAutoConfiguration {
    
    @EnableConfigurationProperties(WebProperties.class)
	/*xxx: 通过 @EnableWebFlux可以开启配置 */
	public static class EnableWebFluxConfiguration extends DelegatingWebFluxConfiguration {
        
     	/*xxx: 基于 requestMapping的 适配器 */
		/*xxx: 基于 router的 适配器 和 handlerMapping 已经在父类进行了配置 */
		protected RequestMappingHandlerAdapter createRequestMappingHandlerAdapter() {
			if (this.webFluxRegistrations != null) {
				RequestMappingHandlerAdapter adapter = this.webFluxRegistrations.getRequestMappingHandlerAdapter();
				if (adapter != null) {
					return adapter;
				}
			}
			return super.createRequestMappingHandlerAdapter();
		}
        
        /*xxx: 基于 requestMapping的 handlerMapping */
		protected RequestMappingHandlerMapping createRequestMappingHandlerMapping() {
			if (this.webFluxRegistrations != null) {
				RequestMappingHandlerMapping mapping = this.webFluxRegistrations.getRequestMappingHandlerMapping();
				if (mapping != null) {
					return mapping;
				}
			}
			return super.createRequestMappingHandlerMapping();
		}
    }
 	   
}
  • 资源处理等其它多个handlerMapping的配置(略)
public class WebFluxConfigurationSupport implements ApplicationContextAware {
	//略....
}	

# 微服务网关

# 基本原理

  • 遵循webFlux规范
/*xxx: routePredicate 的handlerMapping,基于webflux风格, 是网关的核心与webflux的接入点 */
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
    
    /*xxx: 路由定位器 */
	private final RouteLocator routeLocator;
    
 	/*xxx: 查找路由断言 */
	protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
     	/*xxx: 通过路由定位器,获取routes */
		return this.routeLocator.getRoutes();   
    }
}
/*xxx: 路由定位器 */
public interface RouteLocator {
 	Flux<Route> getRoutes();   
}
/*xxx: 基于路由描述的 路由定位器 */
public class RouteDefinitionRouteLocator
		implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {
 	/*xxx: 路由描述定位器 */
	private final RouteDefinitionLocator routeDefinitionLocator;
    
    public Flux<Route> getRoutes() {
     	/*xxx: 委托路由描述定位器 获取路由描述,然后转为路由信息 */
		Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions()
				.map(this::convertToRoute);   
    }
    
    /*xxx: 将路由描述转换为路由 */
	private Route convertToRoute(RouteDefinition routeDefinition) {
		AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
		/*xxx: 获取路由过滤器 */
		List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);

		return Route.async(routeDefinition).asyncPredicate(predicate)
				.replaceFilters(gatewayFilters).build();
	}
    
    /*xxx: 获取路由过滤器 */
	private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) {
        
     	/*xxx: 如果配置了默认网关过滤器,则所有路由都会有该过滤器信息 */
		if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {
			filters.addAll(loadGatewayFilters(DEFAULT_FILTERS,
					new ArrayList<>(this.gatewayProperties.getDefaultFilters())));
		}
        
        if (!routeDefinition.getFilters().isEmpty()) {
			/*xxx: 加载 网关过滤器 */
			filters.addAll(loadGatewayFilters(routeDefinition.getId(),
					new ArrayList<>(routeDefinition.getFilters())));
		}

		AnnotationAwareOrderComparator.sort(filters);
		return filters;
    }
    
    /*xxx: 加载网关过滤器 */
	List<GatewayFilter> loadGatewayFilters(String id,
			List<FilterDefinition> filterDefinitions) {
		/*xxx: 所有的过滤器,都会封装为 OrderedFilter,换言之,所有的过滤器,都必须有明确的顺序*/
		ArrayList<GatewayFilter> ordered = new ArrayList<>(filterDefinitions.size());
     	
        for (int i = 0; i < filterDefinitions.size(); i++) {
         	/*xxx: 通过网关过滤器工厂,获取配置的网关*/
			GatewayFilter gatewayFilter = factory.apply(configuration);
			/*xxx: 如果配置的网关,本身定义了过滤器,本身即是 Ordered,制定了顺序,则使用该顺序*/
			if (gatewayFilter instanceof Ordered) {
				ordered.add(gatewayFilter);
			}
			else {
				/*xxx: 否则,使用定义的顺序进行排序,注意,即 按照定义的时间先后进行排序*/
				/*xxx: 正常情况下,所有的网关过滤器,均位于 下游服务器处理之前 */
				ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
			}   
        }
        
        return ordered;
    }
}
  • handlerMapping-> routePredicate
    • routeLocator
    • routeDefinitionLocator
  • FilteringWebHandler

# 关键组件

# 路由仓库

public interface RouteDefinitionLocator {

	Flux<RouteDefinition> getRouteDefinitions();

}
/*xxx: 路由定义仓库,后续应该会扩展为 jdbc,或者其它介质存储*/
public class InMemoryRouteDefinitionRepository implements RouteDefinitionRepository {
 	public Mono<Void> save(Mono<RouteDefinition> route) {
     	//略...   
    }
    
    public Mono<Void> delete(Mono<String> routeId) {
     	//略...   
    }
}

# FilteringWebHandler处理器

/*xxx: 实际的处理器处理   */
public class FilteringWebHandler implements WebHandler {
 	private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
		/*xxx: 将所有的全局过滤器,转换为 网关过滤适配器 */
		return filters.stream().map(filter -> {
			GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
			if (filter instanceof Ordered) {
				int order = ((Ordered) filter).getOrder();
				return new OrderedGatewayFilter(gatewayFilter, order);
			}
			return gatewayFilter;
		}).collect(Collectors.toList());
	}
    
    @Override
	public Mono<Void> handle(ServerWebExchange exchange) {
		/*xxx: 获取当前路由断言信息 */
		Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
		/*xxx: 从路由断言中,获取 网关过滤器*/
		List<GatewayFilter> gatewayFilters = route.getFilters();

		/*xxx: 将全局过滤器,与网关过滤器拼接在一起 */
		List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
		combined.addAll(gatewayFilters);
		// TODO: needed or cached?
		/*xxx: 进行排序,  如果不遵循规范,则 全局过滤器不一定位于所有的网关过滤器之前  */
		AnnotationAwareOrderComparator.sort(combined);

		/*xxx: 将全局过滤器与网关过滤器进行拼接后,封装为过滤链,进行过滤*/
		return new DefaultGatewayFilterChain(combined).filter(exchange);
	}
    
    private static class DefaultGatewayFilterChain implements GatewayFilterChain {
     	/*xxx: 执行过滤连链流程*/
		public Mono<Void> filter(ServerWebExchange exchange) {
         	return Mono.defer(() -> {
				if (this.index < filters.size()) {
					GatewayFilter filter = filters.get(this.index);
					DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
							this.index + 1);
					return filter.filter(exchange, chain);
				}
				else {
					/*xxx: 执行到最后一个过滤器时,调用 complete回调 */
					return Mono.empty(); // complete
				}
			});   
        }
    }
}

# 过滤器说明

  • 网上有些说法:全局过滤器在网关过滤器之前执行,这是错误的,且大错特错,误人子弟

# 全局过滤器

  • 官方的全局过滤器,均实现了ordered接口
名称 顺序 作用
RemoveCachedBodyFilter Integer.MIN_VALUE 响应结束后移除缓存
AdaptCachedBodyGlobalFilter Integer.MIN_VALUE+1000 缓存请求实体等信息
NettyWriteResponseFilter -1 响应结束后,写出响应体,以及异常等处理
WebClientWriteResponseFilter -1(默认未启用) 同上
ForwardPathFilter 0 将配置的forward,写入url中
GatewayMetricsFilter 0 监控
RouteToRequestUrlFilter 10000 进行协议转换适配
LoadBalancerClientFilter 10100 负载均衡,解析节点实例
ReactiveLoadBalancerClientFilter 10150 负载均衡,解析节点实例
WebsocketRoutingFilter Integer.MAX_VALUE-1 websocket转发
ForwardRoutingFilter Integer.MAX_VALUE 站内转发,转发到requestMapping资源处理
NettyRoutingFilter Integer.MAX_VALUE 站外转发
WebClientHttpRoutingFilter Integer.MAX_VALUE(未启用) 同上

# 网关过滤器

  • 所有的网关过滤器,均会被转为Ordered类型,即所有的网关过滤器,都需要有明确的顺序
  • 通过实现Ordered可以指定具体的顺序
  • 没有实现Ordered接口的网关过滤器,将会按照声明的顺序,从1开始,依次排开;
  • 绝大部分情况下,网关过滤器都在下游服务处理之前处理,但也可以通过配置,实现后置处理,实际上,在之前的过滤器,也能够实现后置处理;