# 响应式编程
# java的响应式发展历程
java1.1
中,引入了Observer
和Observable
接口,提供了一种基本的观察者模式实现方式java5
中,引入了并发包java.util.concurrent
,其中包括了Future
和Executor
接口等,使得java
程序可以更好地处理异步任务java8
中,引入了Stream API
,提供了一种基于函数式编程的流式处理,使得数据处理更加高效和简洁java9
中,引入了Flow API
,提供了一种基于反应流
的编程方式,使得程序可以更加响应式地处理数据流java10
中,引入了Var
关键字,可以更加简洁地处理数据类型,从而更好的支持函数式编程java11
中,引入了HttpClient API
,提供了一种基于异步IO的http
客户端实现,可以更好的处理网络请求;
# reactor响应式编程
Reactor 1.x
,发布于2013年,基于java6,提供基础的响应式编程功能,提供了Flux
和Mono
来处理数据流Reactor 2.x
,发布于2014年,基于java7,内部集成了spring框架Reactor 3.x
,发布于2016年,基于java8,引入了函数式编程的思想,同时支持异步IO和反应流编程,能够更好的处理高负载和高并发;Reactor 3.3
,发布于2018年,实现了对java9的支持,包括支持java9的Flow API
;Reactor 3.4
,发布于2020年,实现了对java11的支持,同时增加了对Kotlin和Scala等语言的支持;
# reactor源码解读
# 响应式流规范
/*xxx: 响应式编程接口, 发布者*/
/*xxx: 表示一种能够产生数据流的组件*/
public interface Publisher<T> {
/*xxx: 订阅*/
public void subscribe(Subscriber<? super T> s);
}
/*xxx: 响应式编程接口 订阅者*/
/*xxx: 表示一种能够响应数据流中的数据并进行相应处理的组件*/
public interface Subscriber<T> {
/*xxx: 订阅回调 */
public void onSubscribe(Subscription s);
/*xxx: 事件下一步处理*/
public void onNext(T t);
/*xxx: 出错回调 */
public void onError(Throwable t);
public void onComplete();
}
/*xxx: 订阅控制*/
/*xxx: 表示一种能够控制数据流的组件*/
public interface Subscription {
/*xxx: 触发流处理 */
/*xxx: 请求Publisher发送指定数量的元素, Publisher会根据请求的数量发送元素, 知道满足请求或者数据流被处理完成*/
public void request(long n);
/*xxx: 订阅者在处理完所有的元素后,应该调用该方法通知Publisher取消订阅,以避免资源浪费和内存泄漏*/
public void cancel();
}
/*xxx: 处理器,既是消费者,也是订阅者*/
/*xxx: 表示一种能够接收输入数据,并产出输出数据的组件*/
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
# flux响应式编程规范
/*xxx: reactor 发布者*/
public interface CorePublisher<T> extends Publisher<T> {
/*xxx: 订阅*/
void subscribe(CoreSubscriber<? super T> subscriber);
}
/*xxx: 订阅者*/
public interface CoreSubscriber<T> extends Subscriber<T> {
@Override
void onSubscribe(Subscription s);
}
/*xxx: 多元素反应堆 接口*/
public abstract class Flux<T> implements CorePublisher<T> {
/*xxx: 构造反应堆流实例*/
public static <T> Flux<T> just(T data) {
return onAssembly(new FluxJust<>(data));
}
/*xxx: 派发事件 */
public final Flux<T> publishOn(Scheduler scheduler, int prefetch) {
return publishOn(scheduler, true, prefetch);
}
/*xxx: 派发事件 */
final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch, int lowTide) {
/*xxx: 派发事件 */
return onAssembly(new FluxPublishOn<>(this, scheduler, delayError, prefetch, lowTide, Queues.get(prefetch)));
}
/*xxx: 通过消费者进行订阅*/
public final Disposable subscribe(Consumer<? super T> consumer) {
Objects.requireNonNull(consumer, "consumer");
return subscribe(consumer, null, null);
}
}
/*xxx:发送单个事件之后即完成的 反应堆流*/
final class FluxJust<T> extends Flux<T>
implements Fuseable.ScalarCallable<T>, Fuseable,SourceProducer<T> {
/*xxx: 订阅事件*/
public void subscribe(final CoreSubscriber<? super T> actual) {
/*xxx: scala订阅控制 */
actual.onSubscribe(Operators.scalarSubscription(actual, value, "just"));
}
}
/*xxx: 发送定额数组事件 的反应堆流*/
final class FluxArray<T> extends Flux<T> implements Fuseable, SourceProducer<T> {
public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) {
/*xxx: 数组订阅控制*/
s.onSubscribe(new ArraySubscription<>(s, array));
}
/*xxx: 订阅事件 */
public void subscribe(CoreSubscriber<? super T> actual) {
subscribe(actual, array);
}
/*xxx: 数组订阅控制 */
static final class ArraySubscription<T>
implements InnerProducer<T>, SynchronousSubscription<T> {
@Override /*xxx: 进行订阅请求 */
public void request(long n) {
if (Operators.validate(n)) {
if (Operators.addCap(REQUESTED, this, n) == 0) {
if (n == Long.MAX_VALUE) {
fastPath();
}
else {
slowPath(n);
}
}
}
}
void fastPath() {
final T[] a = array;
final int len = a.length;
final Subscriber<? super T> s = actual;
for (int i = index; i != len; i++) {
if (cancelled) {
return;
}
T t = a[i];
if (t == null) {
s.onError(new NullPointerException("The " + i + "th array element was null"));
return;
}
/*xxx: 调用 onNext回调*/
s.onNext(t);
}
if (cancelled) {
return;
} /*xxx: 调用流处理完成的回调*/
s.onComplete();
}
}
}
public class FluxExample {
public static void main(String[] args) {
//xxx: 除了消费者订阅外,还可以通过订阅者订阅
/*xxx: 消费者,后续也是通过被包装为订阅者实现的 */
Flux.just("hello","world").subscribe((t)->{
System.out.println("consumer callback");
System.out.println(t);
});
Flux.just("hi","reactor").subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
}
}
# mono响应式编程规范
public abstract class Mono<T> implements CorePublisher<T> {
//xxx: 略...
}
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {
public final void onSubscribe(Subscription s) {
if (Operators.validate(subscription, s)) {
this.subscription = s;
if (subscriptionConsumer != null) {
try {
subscriptionConsumer.accept(s);
}
catch (Throwable t) {
Exceptions.throwIfFatal(t);
s.cancel();
onError(t);
}
}
else { /*xxx: 订阅回调触发 request,触发 流操作*/
s.request(Long.MAX_VALUE);
}
}
}
@Override /*xxx: 流操作回调*/
public final void onNext(T x) {
/*xxx: 消费者处理*/
consumer.accept(x);
}
}
public class MonoExample {
public static void main(String[] args) {
Mono.just("hello").subscribe((s)->{
System.out.println(s);
});
Mono.just("hello").subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
}
}
# reactor原理解读(个人理解)
- 需要处理的数据,抽象为事件
- 响应式编程中,发布者用于产生事件源,由Publisher接口定义,其中,Flux与Mono是该接口的两种实现;
- 订阅者可以订阅事件,并进行流式消费,订阅者订阅事件,是通过与发布者交互完成的。
- 订阅者一旦订阅完成,即开始处理事件,处理事件是通过Subscription进行控制的,主要体现在
request
方法 - request方法执行后,会调用订阅者的onNext回调接口,该接口用于消费事件,并且可以将处理的结果传递给下游订阅者继续处理;
# 命名由来
- Flux与Mono响应式编程模型,来源于化学中的反应堆模型
- Mono源于单个反应堆的概念,因此Mono代表单个元素的流
- Flux源于连续反应堆的概念,因此Flux代表多个元素的流
- 化学中,反应堆是用来进行化学反应的设备或容器;
# 个人解读
编程中,从以前的同步思路,转换为异步思路,可以形象理解为方法到这了,但是不一定立即执行,即有需要才执行
通过事件机制,使得跨线程沟通协作变得简单,这是其尤为重要的特性
# Netty响应式
# Netty官方源码整合
/*xxx: 传输层,用于处理网络传输层的相关逻辑, 包括TCP和UDP协议的编解码,数据读写,连接管理 */
public abstract class Transport<T extends Transport<T, C>, C extends TransportConfig> {
//xxx: 略...
}
public abstract class ServerTransport<T extends ServerTransport<T, CONF>,CONF extends ServerTransportConfig<CONF>> extends Transport<T, CONF> {
/*xxx: 绑定地址 */
public Mono<? extends DisposableServer> bind() {
/*xxx: 创建Mono反应堆 */
Mono<? extends DisposableServer> mono = Mono.create(sink -> {
/*xxx: 处理连接请求,传入了 信道处理器参数 */
Acceptor acceptor = new Acceptor(config.childEventLoopGroup(), config.channelInitializer(childObs, null, true),
config.childOptions, config.childAttrs, isDomainSocket);
/*xxx: 通过sink参数,贯穿控制 */
if (local instanceof DomainSocketAddress) {
isDomainSocket = true;
disposableServer = new UdsDisposableBind(sink, config, local);
}
else {
disposableServer = new InetDisposableBind(sink, config, local);
}
/*xxx: 订阅并启动流*/
TransportConnector.bind(config, new AcceptorInitializer(acceptor), local, isDomainSocket).subscribe(disposableServer);
};
if (config.doOnBind() != null) {
/*xxx: 绑定成功后,订阅并启动流*/
mono = mono.doOnSubscribe(s -> config.doOnBind().accept(config));
}
return mono;
}
/*xxx: 绑定监听*/
public final DisposableServer bindNow() {
return bindNow(Duration.ofSeconds(45));
}
/*xxx:处理连接请求 */
static class Acceptor extends ChannelInboundHandlerAdapter {
/*xxx: 信道读取事件 */
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
/*xxx: 添加信道处理器 */
child.pipeline().addLast(childHandler);
childGroup.register(child).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
});
}
}
}
/*xxx: httpserver 建造器 */
public abstract class HttpServer extends ServerTransport<HttpServer, HttpServerConfig> {
/*xxx: 创建服务器实例*/
public static HttpServer create() {
return HttpServerBind.INSTANCE;
}
/*xxx: 添加 事件处理器 */
public final HttpServer handle(
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
Objects.requireNonNull(handler, "handler");
return childObserve(new HttpServerHandle(handler));
}
/*xxx: 添加路由处理 */
public final HttpServer route(Consumer<? super HttpServerRoutes> routesBuilder) {
Objects.requireNonNull(routesBuilder, "routeBuilder");
HttpServerRoutes routes = HttpServerRoutes.newRoutes();
/*xxx: 进行消费 */
routesBuilder.accept(routes);
/*xxx: 路由最终,仍然是通过 处理器进行处理的 */
return handle(routes);
}
/*xxx: httpServer事件处理器 */
static final class HttpServerHandle implements ConnectionObserver {
final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler;
/*xxx: 状态变更回调 */
public void onStateChange(Connection connection, State newState) {
/*xxx: 处理器进行处理 */
Mono<Void> mono = Mono.fromDirect(handler.apply(ops, ops));
/*xxx: 订阅并启动流处理 */
mono.subscribe(ops.disposeSubscriber());
}
}
}
@FunctionalInterface
/*xxx: 连接观察器 */
public interface ConnectionObserver {
/*xxx: 状态变更回调 */
void onStateChange(Connection connection, State newState);
}
# 应用级容器基础设施-路由
- 其本质上,与servlet处于一个抽象层次
/*xxx: 路由接口*/
public interface HttpServerRoutes extends BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> {
/*xxx: 实例化路由定义 */
static HttpServerRoutes newRoutes() {
return new DefaultHttpServerRoutes();
}
}
final class DefaultHttpServerRoutes implements HttpServerRoutes {
/*xxx: 配置路由 */
public HttpServerRoutes route(Predicate<? super HttpServerRequest> condition,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
if (condition instanceof HttpPredicate) {
handlers.add(new HttpRouteHandler(condition,
handler,
(HttpPredicate) condition));
}
else {
handlers.add(new HttpRouteHandler(condition, handler, null));
}
return this;
}
/*xxx: 匹配路由,并执行*/
public Publisher<Void> apply(HttpServerRequest request, HttpServerResponse response) {
final Iterator<HttpRouteHandler> iterator = handlers.iterator();
HttpRouteHandler cursor;
while (iterator.hasNext()) {
cursor = iterator.next();
if (cursor.test(request)) {
return cursor.apply(request, response);
}
}
}
/*xxx: 路由处理器 */
static final class HttpRouteHandler implements BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>,
Predicate<HttpServerRequest> {
final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>>
handler;
public Publisher<Void> apply(HttpServerRequest request, HttpServerResponse response) {
return handler.apply(request.paramsResolver(resolver), response);
}
}
}
# netty响应式容器使用示例
public class NettyReactorExample {
public static void main(String[] args) throws InterruptedException {
HttpServer.create()
.host("0.0.0.0")
.port(8077)
.route(routes->{
routes.get("/hello",
(req, res) -> res.header("ContentType", "text/plain;")
.sendString(Mono.just("Hello World!")));
routes.get("/test",
(req, res) -> res.header("ContentType", "text/plain;")
.sendString(Mono.just("this is Test...")));
})
.bind().block();
Thread thread =new Thread(()->{
while (true){
//doSomeThing
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.run();
}
}
# SpringBoot的集成容器
# 上下文体系
/*xxx: 基于响应式的服务上下文*/
public class GenericReactiveWebApplicationContext extends GenericApplicationContext
implements ConfigurableReactiveWebApplicationContext {
//略...
}
/*xxx: 基于响应式的服务上下文*/
public class ReactiveWebServerApplicationContext extends GenericReactiveWebApplicationContext
implements ConfigurableWebServerApplicationContext {
/*xxx: 服务容器管理器 */
private volatile WebServerManager serverManager;
/*xxx: 服务命名空间 */
private String serverNamespace;
protected void onRefresh() {
/*xxx: 创建服务容器*/
createWebServer();
}
private void createWebServer() {
/*xxx:检查 服务容器管理器 */
WebServerManager serverManager = this.serverManager;
if (serverManager == null) {
/*xxx: 获取服务容器工厂*/
ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);
/*xxx: 初始化 服务容器管理器*/
/*xxx: 服务容器的生命周期,由框架托管*/
this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);
/*xxx: 为服务容器,注入生命周期管理*/
getBeanFactory().registerSingleton("webServerStartStop",
new WebServerStartStopLifecycle(this.serverManager));
}
}
}
/*xxx: 服务容器管理器 */
class WebServerManager {
private final WebServer webServer;
void start() {
this.handler.initializeHandler();
/*xxx: 启动服务容器 */
this.webServer.start();
/*xxx: 服务容器启动后, 发布事件通知*/
this.applicationContext
.publishEvent(new ReactiveWebServerInitializedEvent(this.webServer, this.applicationContext));
}
}
class WebServerStartStopLifecycle implements SmartLifecycle {
/*xxx: 生命周期由spring上下文托管*/
public void start() {
/*xxx: 启动响应式服务容器 */
this.weServerManager.start();
this.running = true;
}
public void stop() {
this.running = false;
/*xxx: 停止响应式服务容器*/
this.weServerManager.stop();
}
}
# 响应式容器体系
/*xxx: 响应式服务容器工厂 */
public interface ReactiveWebServerFactory {
/*xxx: 获取服务器容器, 需要指定 httpHandler */
WebServer getWebServer(HttpHandler httpHandler);
}
/*xxx: 基于 响应式的 服务器容器工厂 */
public abstract class AbstractReactiveWebServerFactory extends AbstractConfigurableWebServerFactory
implements ConfigurableReactiveWebServerFactory {
//略...
}
/*xxx: 基于netty实现的响应式服务器容器 */
public class NettyReactiveWebServerFactory extends AbstractReactiveWebServerFactory {
public WebServer getWebServer(HttpHandler httpHandler) {
/*xxx: 创建服务器容器*/
HttpServer httpServer = createHttpServer();
/*xxx: 基于reactor 响应式适配器, 本质上遵循 reactor规范 */
ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
/*xxx: 创建netty服务容器 */
NettyWebServer webServer = createNettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout,
getShutdown());
webServer.setRouteProviders(this.routeProviders);
return webServer;
}
/*xxx: 创建netty服务器*/
NettyWebServer createNettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter,
Duration lifecycleTimeout, Shutdown shutdown) {
/*xxx: 创建netty服务器 */
return new NettyWebServer(httpServer, handlerAdapter, lifecycleTimeout, shutdown);
}
private HttpServer createHttpServer() {
/*xxx: 创建http服务器容器 */
HttpServer server = HttpServer.create();
/*xxx: 监听地址*/
server = server.bindAddress(this::getListenAddress);
/*xxx: 定义服务容器使用的协议 */
server = server.protocol(listProtocols()).forwarded(this.useForwardHeaders);
return applyCustomizers(server);
}
}
/*xxx: 基于netty实现的服务容器 */
public class NettyWebServer implements WebServer {
private final HttpServer httpServer;
private volatile DisposableServer disposableServer;
/*xxx: 路由*/
private List<NettyRouteProvider> routeProviders = Collections.emptyList();
public NettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter, Duration lifecycleTimeout,
Shutdown shutdown) {
/*xxx: 请求处理适配器*/
this.handler = handlerAdapter;
/*xxx: 通过httpServer开启的信道,作为实际的服务处理器*/
this.httpServer = httpServer.channelGroup(new DefaultChannelGroup(new DefaultEventExecutor()));
}
/*xxx: 启动服务容器 */
public void start() throws WebServerException {
this.disposableServer = startHttpServer();
}
/*xxx: 启动http服务器*/
DisposableServer startHttpServer() {
HttpServer server = this.httpServer;
if (this.routeProviders.isEmpty()) {
/*xxx: 为 http服务添加处理器 */
server = server.handle(this.handler);
}
else {
/*xxx: 为 http服务添加路由规则*/
server = server.route(this::applyRouteProviders);
}
return server.bindNow();
}
}
# webFlux响应式编程
# 响应式编程基础设施
# HttpHandler
/*xxx: reactive容器的组件定义 */
public interface HttpHandler {
Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
}
/*xxx: reactive容器的组件定义 */
public interface ServerWebExchange {
ServerHttpRequest getRequest();
ServerHttpResponse getResponse();
Mono<WebSession> getSession();
ApplicationContext getApplicationContext();
}
# WebHandler
public interface WebHandler {
Mono<Void> handle(ServerWebExchange exchange);
}
# HttpWebHandlerAdapter
public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
ServerWebExchange exchange = createExchange(request, response);
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
}
public class WebHandlerDecorator implements WebHandler {
private final WebHandler delegate;
public Mono<Void> handle(ServerWebExchange exchange) {
return this.delegate.handle(exchange);
}
}
# 响应式编程配置及原理
# DispatcherHandler(类比DispatcherServlet)
public class DispatcherHandler implements WebHandler, ApplicationContextAware {
@Nullable
/*xxx: handlerMapping定义*/
private List<HandlerMapping> handlerMappings;
@Nullable
/*xxx: handlerAdapter定义*/
private List<HandlerAdapter> handlerAdapters;
@Nullable
/*xxx: 结果处理器定义 */
private List<HandlerResultHandler> resultHandlers;
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.switchIfEmpty(createNotFoundError())
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
}
# HandlerMapping
/*xxx: 基于webflux的 handlerMapping*/
public interface HandlerMapping {
/*xxx: 获取handler处理器 */
Mono<Object> getHandler(ServerWebExchange exchange);
}
public abstract class AbstractHandlerMapping extends ApplicationObjectSupport
implements HandlerMapping, Ordered, BeanNameAware {
//xxx: RequestMapping 默认使用该值作为顺序
private int order = Ordered.LOWEST_PRECEDENCE;
public Mono<Object> getHandler(ServerWebExchange exchange) {
return getHandlerInternal(exchange).map(handler -> {
return handler;
});
}
/*xxx: 获取处理器 */
protected abstract Mono<?> getHandlerInternal(ServerWebExchange exchange);
}
- 基于RouterFunction
public class RouterFunctionMapping extends AbstractHandlerMapping implements InitializingBean {
/*xxx: 路由注册表 */
private RouterFunction<?> routerFunction;
public void afterPropertiesSet() throws Exception {
/*xxx: 初始化路由注册表 */
initRouterFunctions();
}
protected void initRouterFunctions() {
List<RouterFunction<?>> routerFunctions = routerFunctions();
this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null);
}
private List<RouterFunction<?>> routerFunctions() {
/*xxx: 从上下文中,获取 RouterFunction的bean*/
List<RouterFunction<?>> functions = obtainApplicationContext()
.getBeanProvider(RouterFunction.class)
.orderedStream()
.map(router -> (RouterFunction<?>)router)
.collect(Collectors.toList());
return (!CollectionUtils.isEmpty(functions) ? functions : Collections.emptyList());
}
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
/*xxx: 基于 routerFunction 的处理器映射 */
if (this.routerFunction != null) {
ServerRequest request = ServerRequest.create(exchange, this.messageReaders);
/*xxx: 路由映射,返回HandlerFunction */
return this.routerFunction.route(request)
.doOnNext(handler -> setAttributes(exchange.getAttributes(), request, handler));
}
else {
return Mono.empty();
}
}
}
- 基于RequestMapping
public abstract class AbstractHandlerMethodMapping<T> extends AbstractHandlerMapping implements InitializingBean {
/*xxx: 注册侧映射映射*/
public void registerMapping(T mapping, Object handler, Method method) {
this.mappingRegistry.register(mapping, handler, method);
}
public void afterPropertiesSet() {
/*xxx: 初始化requestMapping注册表 */
initHandlerMethods();
}
public Mono<HandlerMethod> getHandlerInternal(ServerWebExchange exchange) {
this.mappingRegistry.acquireReadLock();
try {
HandlerMethod handlerMethod;
try {
handlerMethod = lookupHandlerMethod(exchange);
}
catch (Exception ex) {
return Mono.error(ex);
}
if (handlerMethod != null) {
handlerMethod = handlerMethod.createWithResolvedBean();
}
return Mono.justOrEmpty(handlerMethod);
}
finally {
this.mappingRegistry.releaseReadLock();
}
}
}
/*xxx: 基于 requestMapping的处理器映射 */
public abstract class RequestMappingInfoHandlerMapping extends AbstractHandlerMethodMapping<RequestMappingInfo> {
//略...
}
- 基于RouterPredication(微服务网关)
/*xxx: routePredicate 的handlerMapping,基于webflux风格, 是网关的核心与webflux的接入点 */
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
/*xxx: handler处理器, webHandler实例 , 由 RouterAdapter适配器进行处理 ,也就是 HandlerFunctionAdapter*/
private final FilteringWebHandler webHandler;
/*xxx: 路由定位器 */
private final RouteLocator routeLocator;
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
/*xxx: 查找路由, 返回 FilteringWebHandler */
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug(
"Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for ["
+ getExchangeDesc(exchange) + "]");
}
})));
}
}
# HandlerAdapter
/*xxx: reactive风格的 handlerAdapter*/
public interface HandlerAdapter {
boolean supports(Object handler);
Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler);
}
- 基于HandlerFunction
/*xxx:reactive 基于路由器的 适配器*/
public class HandlerFunctionAdapter implements HandlerAdapter {
@Override
/*xxx: 支持处理路由器返回的 handlerFunction*/
public boolean supports(Object handler) {
return handler instanceof HandlerFunction;
}
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
HandlerFunction<?> handlerFunction = (HandlerFunction<?>) handler;
ServerRequest request = exchange.getRequiredAttribute(RouterFunctions.REQUEST_ATTRIBUTE);
/*xxx: 通过调用 handlerFunction ,并将结果包装成 handlerResult*/
return handlerFunction.handle(request)
.map(response -> new HandlerResult(handlerFunction, response, HANDLER_FUNCTION_RETURN_TYPE));
}
}
- 基于RequestMapping
/*xxx: reactive风格 基于 requestMapping 的 适配器 */
public class RequestMappingHandlerAdapter implements HandlerAdapter, ApplicationContextAware, InitializingBean {
@Override
public boolean supports(Object handler) {
return handler instanceof HandlerMethod;
}
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
InvocableHandlerMethod invocableMethod = this.methodResolver.getRequestMappingMethod(handlerMethod);
return this.modelInitializer
.initModel(handlerMethod, bindingContext, exchange)
.then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext)))
.doOnNext(result -> result.setExceptionHandler(exceptionHandler))
.doOnNext(result -> bindingContext.saveModel())
.onErrorResume(exceptionHandler);
}
}
- 基于WebHandler
public class SimpleHandlerAdapter implements HandlerAdapter {
@Override
public boolean supports(Object handler) {
return WebHandler.class.isAssignableFrom(handler.getClass());
}
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
WebHandler webHandler = (WebHandler) handler;
Mono<Void> mono = webHandler.handle(exchange);
return mono.then(Mono.empty());
}
}
# 响应式编程自动配置
# WebFlux配置器
/*xxx: webflux自动配置*/
public class WebFluxAutoConfiguration {
@Import({ EnableWebFluxConfiguration.class })
/*xxx: webflux配置器 */
public static class WebFluxConfig implements WebFluxConfigurer {
/*xxx: 添加资源处理器 */
public void addResourceHandlers(ResourceHandlerRegistry registry) {
//略...
}
/*xxx: 配置视图解析器 */
public void configureViewResolvers(ViewResolverRegistry registry) {
this.viewResolvers.orderedStream().forEach(registry::viewResolver);
}
}
}
# WebFlux配置
/*xxx: webflux自动配置*/
public class WebFluxAutoConfiguration {
@EnableConfigurationProperties(WebProperties.class)
/*xxx: 通过 @EnableWebFlux可以开启配置 */
public static class EnableWebFluxConfiguration extends DelegatingWebFluxConfiguration {
/*xxx: 基于 requestMapping的 适配器 */
/*xxx: 基于 router的 适配器 和 handlerMapping 已经在父类进行了配置 */
protected RequestMappingHandlerAdapter createRequestMappingHandlerAdapter() {
if (this.webFluxRegistrations != null) {
RequestMappingHandlerAdapter adapter = this.webFluxRegistrations.getRequestMappingHandlerAdapter();
if (adapter != null) {
return adapter;
}
}
return super.createRequestMappingHandlerAdapter();
}
/*xxx: 基于 requestMapping的 handlerMapping */
protected RequestMappingHandlerMapping createRequestMappingHandlerMapping() {
if (this.webFluxRegistrations != null) {
RequestMappingHandlerMapping mapping = this.webFluxRegistrations.getRequestMappingHandlerMapping();
if (mapping != null) {
return mapping;
}
}
return super.createRequestMappingHandlerMapping();
}
}
}
- 资源处理等其它多个handlerMapping的配置(略)
public class WebFluxConfigurationSupport implements ApplicationContextAware {
//略....
}
# 微服务网关
# 基本原理
- 遵循webFlux规范
/*xxx: routePredicate 的handlerMapping,基于webflux风格, 是网关的核心与webflux的接入点 */
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
/*xxx: 路由定位器 */
private final RouteLocator routeLocator;
/*xxx: 查找路由断言 */
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
/*xxx: 通过路由定位器,获取routes */
return this.routeLocator.getRoutes();
}
}
/*xxx: 路由定位器 */
public interface RouteLocator {
Flux<Route> getRoutes();
}
/*xxx: 基于路由描述的 路由定位器 */
public class RouteDefinitionRouteLocator
implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {
/*xxx: 路由描述定位器 */
private final RouteDefinitionLocator routeDefinitionLocator;
public Flux<Route> getRoutes() {
/*xxx: 委托路由描述定位器 获取路由描述,然后转为路由信息 */
Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions()
.map(this::convertToRoute);
}
/*xxx: 将路由描述转换为路由 */
private Route convertToRoute(RouteDefinition routeDefinition) {
AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
/*xxx: 获取路由过滤器 */
List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);
return Route.async(routeDefinition).asyncPredicate(predicate)
.replaceFilters(gatewayFilters).build();
}
/*xxx: 获取路由过滤器 */
private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) {
/*xxx: 如果配置了默认网关过滤器,则所有路由都会有该过滤器信息 */
if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {
filters.addAll(loadGatewayFilters(DEFAULT_FILTERS,
new ArrayList<>(this.gatewayProperties.getDefaultFilters())));
}
if (!routeDefinition.getFilters().isEmpty()) {
/*xxx: 加载 网关过滤器 */
filters.addAll(loadGatewayFilters(routeDefinition.getId(),
new ArrayList<>(routeDefinition.getFilters())));
}
AnnotationAwareOrderComparator.sort(filters);
return filters;
}
/*xxx: 加载网关过滤器 */
List<GatewayFilter> loadGatewayFilters(String id,
List<FilterDefinition> filterDefinitions) {
/*xxx: 所有的过滤器,都会封装为 OrderedFilter,换言之,所有的过滤器,都必须有明确的顺序*/
ArrayList<GatewayFilter> ordered = new ArrayList<>(filterDefinitions.size());
for (int i = 0; i < filterDefinitions.size(); i++) {
/*xxx: 通过网关过滤器工厂,获取配置的网关*/
GatewayFilter gatewayFilter = factory.apply(configuration);
/*xxx: 如果配置的网关,本身定义了过滤器,本身即是 Ordered,制定了顺序,则使用该顺序*/
if (gatewayFilter instanceof Ordered) {
ordered.add(gatewayFilter);
}
else {
/*xxx: 否则,使用定义的顺序进行排序,注意,即 按照定义的时间先后进行排序*/
/*xxx: 正常情况下,所有的网关过滤器,均位于 下游服务器处理之前 */
ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
}
}
return ordered;
}
}
- handlerMapping-> routePredicate
- routeLocator
- routeDefinitionLocator
- FilteringWebHandler
# 关键组件
# 路由仓库
public interface RouteDefinitionLocator {
Flux<RouteDefinition> getRouteDefinitions();
}
/*xxx: 路由定义仓库,后续应该会扩展为 jdbc,或者其它介质存储*/
public class InMemoryRouteDefinitionRepository implements RouteDefinitionRepository {
public Mono<Void> save(Mono<RouteDefinition> route) {
//略...
}
public Mono<Void> delete(Mono<String> routeId) {
//略...
}
}
# FilteringWebHandler处理器
/*xxx: 实际的处理器处理 */
public class FilteringWebHandler implements WebHandler {
private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
/*xxx: 将所有的全局过滤器,转换为 网关过滤适配器 */
return filters.stream().map(filter -> {
GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
if (filter instanceof Ordered) {
int order = ((Ordered) filter).getOrder();
return new OrderedGatewayFilter(gatewayFilter, order);
}
return gatewayFilter;
}).collect(Collectors.toList());
}
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
/*xxx: 获取当前路由断言信息 */
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
/*xxx: 从路由断言中,获取 网关过滤器*/
List<GatewayFilter> gatewayFilters = route.getFilters();
/*xxx: 将全局过滤器,与网关过滤器拼接在一起 */
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
// TODO: needed or cached?
/*xxx: 进行排序, 如果不遵循规范,则 全局过滤器不一定位于所有的网关过滤器之前 */
AnnotationAwareOrderComparator.sort(combined);
/*xxx: 将全局过滤器与网关过滤器进行拼接后,封装为过滤链,进行过滤*/
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
private static class DefaultGatewayFilterChain implements GatewayFilterChain {
/*xxx: 执行过滤连链流程*/
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
this.index + 1);
return filter.filter(exchange, chain);
}
else {
/*xxx: 执行到最后一个过滤器时,调用 complete回调 */
return Mono.empty(); // complete
}
});
}
}
}
# 过滤器说明
- 网上有些说法:全局过滤器在网关过滤器之前执行,这是错误的,且大错特错,误人子弟
# 全局过滤器
- 官方的全局过滤器,均实现了
ordered
接口
名称 | 顺序 | 作用 |
---|---|---|
RemoveCachedBodyFilter | Integer.MIN_VALUE | 响应结束后移除缓存 |
AdaptCachedBodyGlobalFilter | Integer.MIN_VALUE+1000 | 缓存请求实体等信息 |
NettyWriteResponseFilter | -1 | 响应结束后,写出响应体,以及异常等处理 |
WebClientWriteResponseFilter | -1(默认未启用) | 同上 |
ForwardPathFilter | 0 | 将配置的forward,写入url中 |
GatewayMetricsFilter | 0 | 监控 |
RouteToRequestUrlFilter | 10000 | 进行协议转换适配 |
LoadBalancerClientFilter | 10100 | 负载均衡,解析节点实例 |
ReactiveLoadBalancerClientFilter | 10150 | 负载均衡,解析节点实例 |
WebsocketRoutingFilter | Integer.MAX_VALUE-1 | websocket转发 |
ForwardRoutingFilter | Integer.MAX_VALUE | 站内转发,转发到requestMapping资源处理 |
NettyRoutingFilter | Integer.MAX_VALUE | 站外转发 |
WebClientHttpRoutingFilter | Integer.MAX_VALUE(未启用) | 同上 |
# 网关过滤器
- 所有的网关过滤器,均会被转为Ordered类型,即所有的网关过滤器,都需要有明确的顺序
- 通过实现Ordered可以指定具体的顺序
- 没有实现Ordered接口的网关过滤器,将会按照声明的顺序,从1开始,依次排开;
- 绝大部分情况下,网关过滤器都在下游服务处理之前处理,但也可以通过配置,实现后置处理,实际上,在之前的过滤器,也能够实现后置处理;