# 网络编程

# 概述

  • 对信息的发送与接收
  • 使用套接字来达到进程间通信

# 发展

  • 1969年诞生于美国国防部的APRAnet,标志现代计算机网络诞生;
  • 硬件+协议,是网络通信的基础.(参考计算机网络);
  • 硬件:各种传输介质,网线,光纤,无线等;
  • 协议:OSI七层协议->TCP/IP四层协议

# java网络编程

# BIO源码

  • 服务端
public
class ServerSocket implements java.io.Closeable {
    
    private SocketImpl impl;
    
    public ServerSocket() throws IOException {
        setImpl();
    }

    public void bind(SocketAddress endpoint, int backlog) throws IOException {
     	getImpl().bind(epoint.getAddress(), epoint.getPort());
        getImpl().listen(backlog);
    }
    
    private void setImpl() {
       impl = factory.createSocketImpl();
    }
    
    public Socket accept() throws IOException {
        Socket s = new Socket((SocketImpl) null);
        implAccept(s);
    }
    
    protected final void implAccept(Socket s) throws IOException {
     	SocketImpl si = null;
        getImpl().accept(si);
    }
}

class DualStackPlainSocketImpl extends AbstractPlainSocketImpl
{
 	static native int socket0(boolean stream, boolean v6Only) throws IOException;

    static native void bind0(int fd, InetAddress localAddress, int localport,
                             boolean exclBind)
        throws IOException;

    static native int connect0(int fd, InetAddress remote, int remotePort)
        throws IOException;
    
    static native void listen0(int fd, int backlog) throws IOException;

    static native int accept0(int fd, InetSocketAddress[] isaa) throws IOException;
    
    void socketAccept(SocketImpl s) throws IOException {
     	waitForNewConnection(nativefd, timeout);
        newfd = accept0(nativefd, isaa);   
    }
}
  • 客户端
public
class Socket implements java.io.Closeable {
 	SocketImpl impl;
    
     private Socket(SocketAddress address, SocketAddress localAddr,
                   boolean stream) throws IOException {
         setImpl();
         
         bind(localAddr);
         connect(address);
     }
    
    public void connect(SocketAddress endpoint, int timeout) throws IOException {
     	impl.connect(addr, port);   
    }
}

abstract class AbstractPlainSocketImpl extends SocketImpl
{
 	abstract void socketConnect(InetAddress address, int port, int timeout)
        throws IOException;   
}

class DualStackPlainSocketImpl extends AbstractPlainSocketImpl
{
 	connectResult = connect0(nativefd, address, port);   
}

# BIO实践

package com.automannn.demo;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author automannn
 * @Date 2022/3/25
 */
public class SocketTest {
    public static void main(String[] args) throws IOException {
        Thread t1 = new Thread(()->{
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(10001);
                /*xxx: 本质上是手动创建了一个本机的socket,并将底层接收到的socket资源句柄,设置给它*/
                Socket serverProxy= serverSocket.accept();
                InputStream inputStream= serverProxy.getInputStream();
                ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                byte[] target = new byte[4];
                if ( inputStream.read(target)!=-1){
                    outputStream.write(target);
                }
                System.out.println(new String(outputStream.toByteArray()));
                serverProxy.getOutputStream().write("pong".getBytes());
                serverProxy.getOutputStream().flush();
                serverProxy.shutdownOutput();
                System.out.println("服务端接收消息,发送消息->应用层面会话结束");
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        t1.start();

        Thread t2 = new Thread(()->{
            try {
                Socket socket =new Socket("localhost",10001);
                socket.getOutputStream().write("ping".getBytes());
                socket.getOutputStream().flush();
                socket.shutdownOutput();

                InputStream inputStream = socket.getInputStream();
                ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                byte[] target = new byte[4];
                if ( inputStream.read(target)!=-1){
                    outputStream.write(target);
                }
                System.out.println(new String(outputStream.toByteArray()));
                System.out.println("客户端发送消息->接收消息->应用层面会话结束");
            } catch(IOException e){
                e.printStackTrace();
            }
        });

        t2.start();

    }

}

# 注意事项

  • 每个socket都有一对输入输出流,读取时会进行阻塞,除非读到EOF,EOF可以通过shutdownInput或者shutdownOutpu更改该状态
  • 关闭流,重置流,或者关闭socket时,也会变更EOF状态;
  • shutdownInput,shutdownOutput作用是单向关闭流,不会导致socket连接断开;
  • 输入流,输出流与socket具有同等生命周期,关闭流,相当于关闭socket;
  • 输入输出流,涉及到粘包与半包,这只是通信层面
  • socket只是解决通信问题,连接一旦建立,就一直通信

# NIO源码

  • 服务端
/*xxx: 信道*/
public interface Channel extends Closeable {
    public boolean isOpen();
}

public interface NetworkChannel
    extends Channel
{
    NetworkChannel bind(SocketAddress local) throws IOException;
    
    <T> T getOption(SocketOption<T> name) throws IOException;
}

public abstract class SelectableChannel
    extends AbstractInterruptibleChannel
    implements Channel
{
    public abstract SelectorProvider provider();
}

public abstract class AbstractSelectableChannel
    extends SelectableChannel
{
 	private final SelectorProvider provider;
    
    protected AbstractSelectableChannel(SelectorProvider provider) {
        this.provider = provider;
    }
    
    protected abstract void implCloseSelectableChannel() throws IOException;
    
    public final SelectionKey register(Selector sel, int ops,
                                       Object att)
        throws ClosedChannelException
    {
        SelectionKey k = findKey(sel);
        k.interestOps(ops);
        k.attach(att);
        return k;
    }
    
}

public abstract class ServerSocketChannel
    extends AbstractSelectableChannel
    implements NetworkChannel
{
 	protected ServerSocketChannel(SelectorProvider provider) {
        super(provider);
    }
    
    public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }
    
    public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }
    
    public abstract ServerSocketChannel bind(SocketAddress local, int backlog)
        throws IOException;
    
    public abstract ServerSocket socket();
    
    public abstract SocketChannel accept() throws IOException;
}

class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl {
    public ServerSocket socket() {
          this.socket = ServerSocketAdaptor.create(this);
          return this.socket;
        }
    }
    
    public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {
        InetSocketAddress var4 = var1 == null ? new InetSocketAddress(0) : Net.checkAddress(var1);
        Net.bind(this.fd, var4.getAddress(), var4.getPort());
        Net.listen(this.fd, var2 < 1 ? 50 : var2);
    }

	public SocketChannel accept() throws IOException {
        FileDescriptor var4 = new FileDescriptor();
        InetSocketAddress var6;
        
        SocketChannelImpl var2 = new SocketChannelImpl(this.provider(), var4, var6);
        
        return var2;
    }
    
}
public abstract class Selector implements Closeable {
 	public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }
    
    public abstract int select(long timeout)
        throws IOException;
    
    public abstract Set<SelectionKey> keys();
}

final class WindowsSelectorImpl extends SelectorImpl {
 	protected int doSelect(long var1) throws IOException {
     	int var3 = this.updateSelectedKeys();
        return var3;
    }
    
    private int updateSelectedKeys() {
     	++this.updateCount;
        byte var1 = 0;
        int var4 = var1 + this.subSelector.processSelectedKeys(this.updateCount);
        
         WindowsSelectorImpl.SelectThread var3;
        for(Iterator var2 = this.threads.iterator(); var2.hasNext(); var4 += var3.subSelector.processSelectedKeys(this.updateCount)) {
            var3 = (WindowsSelectorImpl.SelectThread)var2.next();
        }

        return var4;
    }
}
  • 客户端
public abstract class SocketChannel
    extends AbstractSelectableChannel
    implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel
{
 	public abstract boolean connect(SocketAddress remote) throws IOException;  
    
    public abstract long write(ByteBuffer[] srcs, int offset, int length)
        throws IOException;
    
    public abstract long read(ByteBuffer[] dsts, int offset, int length)
        throws IOException;
}

class SocketChannelImpl extends SocketChannel implements SelChImpl {
 	public boolean connect(SocketAddress var1) throws IOException {
        int var8 = 0;
        
     	InetSocketAddress var5 = Net.checkAddress(var1);
        
        InetAddress var9 = var5.getAddress();
        
        var8 = Net.connect(this.fd, var9, var5.getPort());
    }
    
    public int read(ByteBuffer var1) throws IOException {
        int var3 = 0;
     	var3 = IOUtil.read(this.fd, var1, -1L, nd);
    }
    
    public int write(ByteBuffer var1) throws IOException {
        int var3 = 0;
        var3 = IOUtil.write(this.fd, var1, -1L, nd);
    }
}

# NIO实践

public class NioStartTest {
	public static void main(String[] args) throws IOException, InterruptedException {

        new Thread(new NioServer()).start();

        Thread.sleep(5000);
        System.out.println("服务端启动成功,启动客户端");

        new Thread(new NioClient()).start();
    }
}
public static class NioServer implements Runnable{
 	private Selector selector;
	private ServerSocketChannel acceptorChannel;
	private volatile boolean stop;   
    public NioServer() throws IOException {
            //监听客户端连接
            acceptorChannel = ServerSocketChannel.open();
            //绑定监听端口
            acceptorChannel.bind(new InetSocketAddress(InetAddress.getByName("127.0.0.1"),8055));
            //并设置为非阻塞模式
            acceptorChannel.configureBlocking(false);
            //创建选择器
            selector= Selector.open();
            acceptorChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    @Override
    public void run() {
            while (!stop){
                    selector.select(1000);
                    Set selectedKeys = selector.selectedKeys();
                    Iterator iterator = selectedKeys.iterator();
                    SelectionKey key = null;

                    while (iterator.hasNext()){
                        key = (SelectionKey) iterator.next();
                        iterator.remove();
                        
                        handleInput(key);
                    }
           }
   }
    
    private void handleInput(SelectionKey key) throws IOException {
     	if (key.isValid()) {
         	if (key.isAcceptable()) {
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                // Add the new connection to the selector
                sc.register(selector, SelectionKey.OP_READ);
            }
            
            if (key.isReadable()) {
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                
                //应答
                doWrite(sc, currentTime);
            }
        }
    }
    
    private void doWrite(SocketChannel channel, String responseStr)
                throws IOException {
     	   if (responseStr != null && responseStr.trim().length() > 0) {
                byte[] bytes = responseStr.getBytes();
                ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
                writeBuffer.put(bytes);
                writeBuffer.flip();
                channel.write(writeBuffer);
           }
    }
}
public static class NioClient implements Runnable{
 	private Selector selector;
	private SocketChannel socketChannel;
	private volatile boolean stop;   
    
    public NioClient(){
     	selector = Selector.open();
        socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);   
    }
    
    @Override
    public void run() {
        doConnect();
        
        while (!stop) {
         	selector.select(1000);   
            Set selectedKeys = selector.selectedKeys();
            Iterator it = selectedKeys.iterator();
            SelectionKey key = null;
            while (it.hasNext()) {
             	key = (SelectionKey) it.next();
                it.remove();   
                handleInput(key);
            }
        }
    }
    
    private void doConnect() throws IOException {
            // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
            if (socketChannel.connect(new InetSocketAddress("127.0.0.1", 8055))) {
                socketChannel.register(selector, SelectionKey.OP_READ);
                doWrite(socketChannel);
            } else {
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
            }
   }
    
   private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
         	SocketChannel sc = (SocketChannel) key.channel();
            
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                 	sc.register(selector, SelectionKey.OP_READ);   
                }
            }
            
            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int readBytes = sc.read(readBuffer);
            } 
        }
    }
    
    private void doWrite(SocketChannel sc) throws IOException {
            byte[] req = "QUERY TIME ORDER".getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
            writeBuffer.put(req);
            writeBuffer.flip();
            sc.write(writeBuffer);
            if (!writeBuffer.hasRemaining())
                System.out.println("Send order 2 server succeed.");
    }
}

# 注意事项

  • 不管是服务端还是客户端,都引入了Selector这个概念,也是NIO的核心
  • Selector来处理读写事件,并且可以处理多个socket的读写事件
  • 可以以非阻塞的方式读取,在BIO中,除非读到了EOF状态,否则会处于阻塞状态;

# Netty网络编程

# 关键接口源码

  • 事件环组-事件环-事件执行器
/*xxx: 事件执行器组 */
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
 	/*xxx: 优雅关闭 */
    Future<?> shutdownGracefully();
    
    /*xxx: 下一个事件执行器*/
    EventExecutor next();
    
    @Override
    /*xxx: 提交任务*/
    <T> Future<T> submit(Callable<T> task);
    
    @Override
    /*xxx: 调度任务 */
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
}

/*xxx: 事件环分组*/
public interface EventLoopGroup extends EventExecutorGroup {
 	/*xxx: 在当前事件环中,注册一个信道*/
    ChannelFuture register(Channel channel);
    
    @Override
    /*xxx: 下一个 事件环*/
    EventLoop next();
}

/*xxx: 事件执行器 */
public interface EventExecutor extends EventExecutorGroup {
	/*xxx: 探测当前某个线程 是否正在执行 事件环*/
    boolean inEventLoop(Thread thread);
    
    /*xxx: 返回异步结果*/
    <V> Promise<V> newPromise();
}

/*xxx: 抽象事件执行器 */
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    @Override
    public EventExecutor next() {
        return this;
    }
}

/*xxx: 抽象事件环 */
public abstract class AbstractEventLoop extends AbstractEventExecutor implements EventLoop {
    
}

/*xxx: 单线程-事件环执行器*/
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}

/*xxx: 多线程事件环执行器*/
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
	@Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
}
/*xxx: NIO 事件环*/
public final class NioEventLoop extends SingleThreadEventLoop {
    private Selector unwrappedSelector;
    
 	Selector unwrappedSelector() {
        return unwrappedSelector;
    }   
}
  • 信道
/*xxx: 信道 */
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
 	/*xxx: 绑定的事件环 */
    EventLoop eventLoop();
    
    /*xxx: 当前信道 是否与特定的 事件环 相绑定 */
    boolean isRegistered();
    
    /*xxx: 信道管道 */
    ChannelPipeline pipeline();
}

/*xxx: 抽象信道 */
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    /*xxx: 是否与特定事件环绑定 */
    private volatile boolean registered;
    
 	@Override
    public boolean isRegistered() {
        return registered;
    }
    
    private void register0(ChannelPromise promise) {
     	doRegister();
        registered = true;   
    }
    
    protected void doRegister() throws Exception {
    }
}

/*xxx: 抽象NIO信道 */
public abstract class AbstractNioChannel extends AbstractChannel {
 	@Override
    protected void doRegister() throws Exception {
     	selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);   
    }
}
  • 信道管道
/*xxx: 信道管道 */
public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
 	/*xxx: 管道头部追加 信道处理器 */
    ChannelPipeline addFirst(String name, ChannelHandler handler);
    
    /*xxx: 管道尾部追加信道处理器 */
    ChannelPipeline addLast(String name, ChannelHandler handler);
    
    /*xxx: 管道上,某个信道处理器前,追加一个信道处理器 */
    ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
    
    /*xxx: 管道上,某个信道处理器后,追加一个信道处理器 */
    ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
}
  • 信道处理器
/*xxx: 信道处理器 */
public interface ChannelHandler {
	/*xxx: 事件追加回调*/
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    
    /*xxx: 事件移除回调 */
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    
    @Deprecated
    /*xxx: 异常捕获事件 */
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

/*xxx: io出站 事件处理*/
public interface ChannelOutboundHandler extends ChannelHandler {
 	/*xxx: 绑定事件 */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /*xxx: 连接事件 */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;
    
    /*xxx: 断开连接事件 */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /*xxx: 套接字关闭事件 */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /*xxx: 取消信道注册事件 */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /*xxx: 读取事件 */
    void read(ChannelHandlerContext ctx) throws Exception;

    /*xxx: 写入事件 */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    /*xxx: 刷新事件*/
    void flush(ChannelHandlerContext ctx) throws Exception;
}

/*xxx: io入站 事件处理, 添加了一些状态变化回调,方便用户对状态变化做出反应*/
public interface ChannelInboundHandler extends ChannelHandler {
 	/*xxx: 信道注册事件 */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    
    /*xxx: 信道取消注册事件 */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
    
    /*xxx: 信道激活事件 */
    void channelActive(ChannelHandlerContext ctx) throws Exception;
    
    /*xxx: 信道取消激活事件 */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;
    
    /*xxx: 信道读事件 */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    
    /*xxx: 信道读取完成事件 */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    
    /*xxx: 用户事件触发事件 */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
}

/*xxx: 信道处理器适配器  */
public abstract class ChannelHandlerAdapter implements ChannelHandler {
 	   /*xxx: 处理器是否共享*/
    public boolean isSharable() {
    	//略
    }
}

  • 启动器
/*xxx: 抽象启动器 */
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    private volatile ChannelFactory<? extends C> channelFactory;
    
 	/*xxx: 绑定事件环 */
    public B group(EventLoopGroup group) {
     	this.group = group;   
    }
    
    /*xxx: 绑定信道 */
    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }
    
    /*xxx: 绑定信道处理器 */
    public B handler(ChannelHandler handler) {
        this.handler = ObjectUtil.checkNotNull(handler, "handler");
        return self();
    }
}

# 通信实践

  • 服务端
package com.automannn.demo.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

/**
 * @author automannn
 * @Date 2022/3/28
 */
public class NettyServerTest {

    public static void main(String[] args) {
        NettyServer nettyServer = new NettyServer(8701);
    }

   static class NettyServer{
        private int port;

        public NettyServer(int port){
            this.port = port;
            this.bind();
        }

        private void bind(){
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();

            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(boss,worker);
                bootstrap.channel(NioServerSocketChannel.class);

                bootstrap.option(ChannelOption.SO_BACKLOG,1024);//连接数
                bootstrap.option(ChannelOption.TCP_NODELAY,true);//不延迟,消息立即发送
                bootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);//长连接

                bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline p = socketChannel.pipeline();
                        p.addLast(new NettyServerHandler());
                    }
                });

                ChannelFuture future= bootstrap.bind(port).sync();
                if(future.isSuccess()){
                    System.out.println("启动Netty服务成功,端口号:"+this.port);
                }
                future.channel().closeFuture().sync();
            }catch (Exception e){
                System.out.println("启动Netty服务异常,异常信息:"+e.getMessage());
            }finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }

   static class NettyServerHandler extends ChannelInboundHandlerAdapter{
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            String received = getMessage(buf);
            System.out.println("服务器接收到消息:"+received);

            //应答
            ctx.writeAndFlush(getSendByteBuf("automannn"));
        }

        private Object getSendByteBuf(String message) {
            byte[] req = message.getBytes(StandardCharsets.UTF_8);
            ByteBuf pingMessage = Unpooled.buffer();
            pingMessage.writeBytes(req);
            return pingMessage;
        }

        private String getMessage(ByteBuf buf) {
            byte[] con = new byte[buf.readableBytes()];
            buf.readBytes(con);
            return new String(con, StandardCharsets.UTF_8);
        }


    }
}
  • 客户端
package com.automannn.demo.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.StandardCharsets;

/**
 * @author automannn
 * @Date 2022/3/28
 */
public class NettyClientTest {

    public static void main(String[] args) {
        NettyClient client = new NettyClient(8701,"localhost");
    }

    static class NettyClient {
        private int port;

        private String host;

        public NettyClient(int port, String host) {
            this.port = port;
            this.host = host;
            start();
        }

        private void start() {
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                bootstrap.group(eventLoopGroup);
                bootstrap.remoteAddress(host, port);

                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });

                ChannelFuture future = bootstrap.connect(host, port).sync();
                if (future.isSuccess()) {
                    System.out.println("=======connect server success=======");
                }
                future.channel().closeFuture().sync();

            } catch (Exception e) {

            } finally {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }

    static class NettyClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            byte[] data = "hello".getBytes(StandardCharsets.UTF_8);
            ByteBuf byteBuf = Unpooled.buffer();
            byteBuf.writeBytes(data);
            ctx.writeAndFlush(byteBuf);
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            String rev = getMessage(buf);
            System.out.println("客户端收到服务器数据: " + rev);
        }

        private String getMessage(ByteBuf message) {
            byte[] con = new byte[message.readableBytes()];
            message.readBytes(con);
            return new String(con, StandardCharsets.UTF_8);
        }
    }
}

# http协议实现

# 源码

  • http协议解码-编码器
/*xxx: 全双工信道处理器 */
/*xxx: 所有的操作均是委派给 上下文操作 */
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
}

/*xxx: 进出站 混合双工处理器 */
public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
        extends ChannelDuplexHandler {
    /*xxx: IO入站 事件处理器 */
    private I inboundHandler;
    /*xxx: IO出战 事件处理器*/
    private O outboundHandler;
}

/*xxx: http服务端解码器 */
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
        implements HttpServerUpgradeHandler.SourceCodec {
}
/*xxx: 端对端消息编码器*/
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {
    
 	@Override
    /*xxx: 写消息*/
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
     	CodecOutputList out = CodecOutputList.newInstance();
        I cast = (I) msg;
        /*xxx: 编码写入*/
        encode(ctx, cast, out);
        
        /*xxx: 写入消息体*/
        ctx.write(out.getUnsafe(0), promise);
    }
    
    /*xxx: 编码写入*/
    protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

/*xxx: http对象编码器 */
public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageToMessageEncoder<Object> {
 	@Override
    /*xxx: 编码 */
    protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
        /*xxx: 写入头部信息*/
     	ctx.alloc().buffer(int);
        
        /*xxx: 追加消息体*/
        out.add(obj);
    }
}

/*xxx: http响应编码器*/
public class HttpResponseEncoder extends HttpObjectEncoder<HttpResponse> {
}
/*xxx: 字节转化为消息 解码器 */
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    /*xxx: 解码 */
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
    
}

/*xxx: http 对象解码器 */
public abstract class HttpObjectDecoder extends ByteToMessageDecoder {
 	 @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
     	out.add(obj);   
    }
}

/*xxx: http请求解码器 */
public class HttpRequestDecoder extends HttpObjectDecoder {
    
}

# 实践

package com.automannn.netty.test;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;

/**
 * @author automannn
 * @Date 2022/4/6
 */
public class HttpServerTest {

    public static void main(String[] args) throws Exception {
        HttpServer server = new HttpServer(8081);
        server.start();
    }


    static class HttpServer{
        private int port;

        public HttpServer(int port){
            this.port = port;
        }

        public void start() throws Exception{
            ServerBootstrap bootstrap = new ServerBootstrap();
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup work = new NioEventLoopGroup();

            bootstrap.group(boss,work)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new HttpServerInitializer());

            ChannelFuture future = bootstrap.bind(new InetSocketAddress(port)).sync();
            System.out.println("http server start up on port: "+ port);
            future.channel().closeFuture().sync();
        }
    }

    static class HttpServerInitializer extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new HttpServerCodec());
            pipeline.addLast("httpAggregator",new HttpObjectAggregator(512*1024));//http消息聚合器
            pipeline.addLast(new HttpRequestHandler());
        }
    }

    static class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
            /*  100 Continue含义
                HTTP客户端程序有一个实体的主体部分要发送给服务器,但希望在发送之前查看下服务器是否会接受这个实体,
                所以在发送实体之前先发送了一个携带100 Continue的Expect请求首部的请求。
                服务器在收到这样的请求后,应该用 100 Continue或一条错误码来进行响应。
            */
            if(HttpUtil.is100ContinueExpected(fullHttpRequest)){
                channelHandlerContext.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.CONTINUE));
            }
            ByteBuf reqContentByteBuf =  fullHttpRequest.content();
            byte[] reqContent  = new byte[reqContentByteBuf.readableBytes()];
            reqContentByteBuf.readBytes(reqContent);

            System.out.println("=====客户端请求体=====");
            System.out.println("=====uri====: "+fullHttpRequest.uri());
            System.out.println("=====headers====: "+fullHttpRequest.headers());
            System.out.println("=====request====: "+fullHttpRequest);
            System.out.println("=====content====: "+new String(reqContent,StandardCharsets.UTF_8));


            String uri  = fullHttpRequest.uri();
            String msg = "<html><head><title>test</title></head><body>你请求uri为:" + uri+"</body></html>";
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK, Unpooled.copiedBuffer(msg, StandardCharsets.UTF_8));
            response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=UTF-8");
            channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    }
}

# mqtt协议实践

# mqtt协议

  • 概述 MQTT(Message Queuing Telemetry Transport),基于发布/订阅范式的消息协议。

工作在TCP/IP协议族上,为硬件性能低下的远程设备,以及网络状况糟糕环境下,设计的发布/订阅型消息协议。

它需要一个消息中间件,主要应用于机器与机器通信以及物联网

由C/S架构组成,其中,C端分为两类角色,发布者或者订阅者; 消息分为主题(Topic)和 负载(payload)两类.

# 源码

  • MqttDecoder TODO

  • MqttEncoder TODO

# 实践

  • 服务端
package com.automannn.demo.mqtt;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.util.CollectionUtils;

import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

/**
 * @author automannn
 * @Date 2022/4/10
 */
@Slf4j
public class ServerTest {

    public static void main(String[] args) {
        MqttServer server = new MqttServer(8080);
        server.start();
    }

    static class MqttServer{


        protected ConcurrentHashMap<String, Channel> channels = new ConcurrentHashMap<>();

        private int port;

        public MqttServer(int port){
            this.port = port;
        }

        public void start() {
            ServerBootstrap bootstrap = new ServerBootstrap();
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup work = new NioEventLoopGroup();

            bootstrap.group(boss,work)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MqttServerInitializer(this));

            ChannelFuture future = null;
            try {
                future = bootstrap.bind(new InetSocketAddress(port)).sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("mqtt server start up on port: "+ port);
            try {
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public ConcurrentHashMap<String, Channel> getChannels() {
            return channels;
        }
    }



    static class MqttServerInitializer extends ChannelInitializer<SocketChannel> {

        private MqttServer server;

        public MqttServerInitializer(MqttServer server) {
            this.server = server;
        }

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline channelPipeline = socketChannel.pipeline();
            channelPipeline.addLast(new MqttDecoder());
            channelPipeline.addLast(MqttEncoder.INSTANCE);

            List<EventListener> listeners = new ArrayList<>();
            SubscribeService service = new MemorySubscribeService();
            //xxx: 添加监听器
            listeners.add(new DefaultMqttServerMessageEventListener(service,server));

            MqttEventDispatcher dispatcher = new MqttEventDispatcher(listeners);
            MqttServerDispatchHandler handler = new MqttServerDispatchHandler(dispatcher,server);

            //xxx: 添加事件分发器
            channelPipeline.addLast(handler);
        }
    }

    static class MqttServerDispatchHandler extends ChannelInboundHandlerAdapter{
        private MqttEventDispatcher dispatcher;

        private MqttServer mqttServer;

        public MqttServerDispatchHandler(MqttEventDispatcher dispatcher,MqttServer server) {
            this.dispatcher = dispatcher;
            this.mqttServer = server;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String channelId = ctx.channel().id().asShortText();
            log.debug("Message received from channel '{} : {}'.",channelId,msg);
            dispatcher.dispatchMessageEvent(ctx,ctx.channel(),msg);
            super.channelRead(ctx,msg);
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            String channelId = ctx.channel().id().asShortText();
            mqttServer.getChannels().put(channelId,ctx.channel());
            super.channelActive(ctx);
        }
    }

    static class MqttEventDispatcher{
        private List<EventListener> listeners;

        public MqttEventDispatcher(List<EventListener> listeners){
            this.listeners = listeners;
        }

        public void dispatchMessageEvent(ChannelHandlerContext ctx,Channel channel,Object msg){
            for (EventListener listener : listeners) {
                if(listener instanceof MqttMessageEventListener){
                    ((MqttMessageEventListener)listener).channelRead(ctx,channel,msg);
                }
            }
        }
    }

    static interface MqttMessageEventListener extends EventListener{
        void channelRead(ChannelHandlerContext ctx,Channel channel,Object msg);
    }

    static abstract class AbstractMqttMessageEventListener implements MqttMessageEventListener{

        @Override
        public void channelRead(ChannelHandlerContext ctx, Channel channel, Object msg) {
            if(msg instanceof MqttMessage){
                MqttMessage message = (MqttMessage) msg;
                MqttMessageType messageType = message.fixedHeader().messageType();
                switch (messageType) {
                    case CONNECT:
                        this.connect(channel, (MqttConnectMessage) message);
                        break;
                    case PUBLISH:
                        this.publish(channel, (MqttPublishMessage) message);
                        break;
                    case SUBSCRIBE:
                        this.subscribe(channel, (MqttSubscribeMessage) message);
                        break;
                    case UNSUBSCRIBE:
                        this.unSubscribe(channel, (MqttUnsubscribeMessage) message);
                        break;
                    case PINGREQ:
                        this.pingReq(channel, message);
                        break;
                    case DISCONNECT:
                        this.disConnect(channel, message);
                        break;
                    default:
                        log.debug("Nonsupport server message  type of '{}'.", messageType);
                        break;
                }
            }
        }

        public void connect(Channel channel, MqttConnectMessage msg) {

            MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(
                            MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                    new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, true), null);
            channel.writeAndFlush(okResp);
        }

        public void pingReq(Channel channel, MqttMessage msg) {

            MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false,
                    MqttQoS.AT_MOST_ONCE, false, 0));
            channel.writeAndFlush(pingResp);
        }

        public void disConnect(Channel channel, MqttMessage msg) {
            if (channel.isActive()) {
                channel.close();
            }
        }

        public void subscribe(Channel channel, MqttSubscribeMessage msg) {
            MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                    MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
                    new MqttSubAckPayload(0));
            channel.writeAndFlush(subAckMessage);
        }

        public void unSubscribe(Channel channel, MqttUnsubscribeMessage msg) {
            MqttUnsubAckMessage unSubAckMessage = (MqttUnsubAckMessage) MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                    MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), null);
            channel.writeAndFlush(unSubAckMessage);
        }

        public void publish(Channel channel, MqttPublishMessage msg) {

        }
    }

    static interface SubscribeService{
        void subscribe(String clientId,String channelId,String topic);

        void unSubscribe(String clientId);

        void unSubscribe(String channelId,String topic);

        List<String> getChannelByTopic(String topic);
    }

    static class MemorySubscribeService implements SubscribeService{

        private static ConcurrentHashMap<String, Map<String,String>> topicGroup = new ConcurrentHashMap<>();

        private ConcurrentLinkedDeque<String> removedClients = new ConcurrentLinkedDeque<String>();

        @Override
        public void subscribe(String clientId, String channelId, String topic) {
            if(topicGroup.containsKey(topic)){
               Map<String,String> group= topicGroup.get(topic);
               group.put(channelId,clientId);
            }else{
                Map<String,String> infos = new HashMap<>();
                infos.put(channelId,clientId);
                topicGroup.put(topic,infos);
            }
        }

        @Override
        public void unSubscribe(String clientId) {
            //剔除客户端,可能剔除多个绑定的主题
            removedClients.add(clientId);
        }

        @Override
        public void unSubscribe(String channelId,String topic) {
            if (topicGroup.containsKey(topic)){
               Map<String,String> infos =  topicGroup.get(topic);
               if (infos.containsKey(channelId)){
                   infos.remove(channelId);
               }
            }
        }

        @Override
        public List<String> getChannelByTopic(String topic) {
            if(!topicGroup.containsKey(topic)){
                return Collections.emptyList();
            }else{
                Map<String,String> infos = topicGroup.get(topic);
                List<String> result = new ArrayList<>();
                Iterator<Map.Entry<String, String>> iterator = infos.entrySet().iterator();
                while (iterator.hasNext()){
                    Map.Entry<String,String> entry = iterator.next();
                    //剔除无效的clientId
                    if(removedClients.contains(entry.getValue())){
                        removedClients.remove(entry.getValue());
                        iterator.remove();
                        continue;
                    }
                    result.add(entry.getKey());
                }
                return result;

            }
        }
    }

    static class DefaultMqttServerMessageEventListener extends AbstractMqttMessageEventListener{

       private SubscribeService subscribeService;

       private MqttServer mqttServer;

        public DefaultMqttServerMessageEventListener(SubscribeService subscribeService,MqttServer server) {
            this.subscribeService = subscribeService;
            this.mqttServer  = server;
        }

        @Override
        public void connect(Channel channel, MqttConnectMessage msg) {
            String clientId = msg.payload().clientIdentifier();
            channel.attr(AttributeKey.valueOf("clientId")).set(clientId);
            super.connect(channel, msg);
        }

        @Override
        public void disConnect(Channel channel, MqttMessage msg) {
            String channelId = channel.id().asShortText();
            try {
                subscribeService.unSubscribe(channelId);
                log.debug("取消订阅全部主题成功. channelId={}", channelId);
            } catch (Exception ex) {
                log.error("取消订阅失败.", ex);
            }

            super.disConnect(channel, msg);
        }

        @Override
        public void subscribe(Channel channel, MqttSubscribeMessage msg) {

            List<MqttTopicSubscription> topicSubscriptions = msg.payload().topicSubscriptions();
            String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
            String channelId = channel.id().asShortText();
            topicSubscriptions.forEach(topicSubscription -> {
                String topic = topicSubscription.topicName();
                MqttQoS mqttQoS = topicSubscription.qualityOfService();
                log.debug("开始订阅. clientId={}, topic={}, qos={}", clientId, topic, mqttQoS.value());
                try {
                    subscribeService.subscribe(clientId, channelId, topic);
                    log.debug("订阅主题成功. clientId={}, channelId={}, topic={}", channelId, channelId, topic);
                } catch (Exception ex) {
                    log.error("订阅失败.", ex);
                }
            });
            super.subscribe(channel, msg);
        }

        @Override
        public void unSubscribe(Channel channel, MqttUnsubscribeMessage msg) {
            String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
            List<String> topics = msg.payload().topics();
            topics.forEach(topic -> {
                try {
                    subscribeService.unSubscribe(clientId, topic);
                    log.debug("取消订阅主题成功. clientId={}, topic={}", clientId, topic);
                } catch (Exception ex) {
                    log.error("取消订阅失败.", ex);
                }
            });

            super.unSubscribe(channel, msg);
        }

        @Override
        public void publish(Channel channel, MqttPublishMessage msg) {
            String topic = msg.variableHeader().topicName();
            List<String> channels= subscribeService.getChannelByTopic(topic);
            if (!CollectionUtils.isEmpty(channels)){
                for (String channelId : channels) {
                    Channel clientChannel  = mqttServer.getChannels().get(channelId);
                    if (clientChannel!=null&& clientChannel.isActive()){

                        ByteBuf buf = msg.content().duplicate();
                        byte[] tmp = new byte[buf.readableBytes()];
                        buf.readBytes(tmp);
                        String content = new String(tmp);
                        MqttPublishMessage sendMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
                                new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0),
                                new MqttPublishVariableHeader(topic, 0),
                                Unpooled.buffer().writeBytes(new String(content.toUpperCase()).getBytes()));
                        clientChannel.writeAndFlush(sendMessage);
                    }
                }
            }
            super.publish(channel, msg);
        }
    }



}
  • 客户端
package com.automannn.demo.mqtt;

import com.sun.javafx.binding.StringFormatter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * @author automannn
 * @Date 2022/4/10
 */
@Slf4j
public class ClientTest {

    public static void main(String[] args) {
        String broker = "tcp://127.0.0.1:8080";
        String subScribeT1 = "automannn/t1";
        String subScribeT2 = "automannn/t2";
        
        String clientId1 = "customClient1";
        String clientId2 = "customClient2";
        String clientId3 = "customClient3";
        String clientId4 = "customClient4";
        String clientId5 = "customClient5";

        MqttCustomClient client1 = new MqttCustomClient(broker,clientId1,subScribeT1);
        MqttCustomClient client2 = new MqttCustomClient(broker,clientId2,subScribeT1);
        MqttCustomClient client3 = new MqttCustomClient(broker,clientId3,subScribeT2);
        MqttCustomClient client4 = new MqttCustomClient(broker,clientId4,subScribeT2);
        MqttCustomClient client5 = new MqttCustomClient(broker,clientId5,subScribeT2);

        client1.start();
        client2.start();
        client3.start();
        client4.start();
        client5.start();
        
        client1.push(subScribeT1,"test,mytopic111111");
        client1.push(subScribeT2,"test,mytopic222222");
    }
    
    static class MqttCustomClient {
       final private String broker;
       final private String clientId;
       final private String topic;
       private  MqttClient sampleClient;

        MemoryPersistence persistence;
        public MqttCustomClient(String broker, String clientId, String topic) {
            this.broker = broker;
            this.clientId = clientId;
            this.topic = topic;

        }

        public void start(){
            try {
                persistence = new MemoryPersistence();
                sampleClient = new MqttClient(broker,clientId,persistence);
                MqttConnectOptions options = new MqttConnectOptions();

                options.setServerURIs(new String[]{broker});
                options.setCleanSession(true);
                options.setKeepAliveInterval(90);
                options.setAutomaticReconnect(true);
                options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);

                sampleClient.setCallback(new MqttCallbackExtended() {
                    @Override
                    public void connectComplete(boolean b, String s) {
                        try {
                            sampleClient.subscribe(topic,0);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void connectionLost(Throwable throwable) {
                    }

                    @Override
                    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                        log.debug("message arrived. topic={}, message={}.", topic, new String(mqttMessage.getPayload()));
                    }

                    @Override
                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                        log.debug("delivery complete. messageId={}.", iMqttDeliveryToken.getMessageId());
                    }
                });

                sampleClient.connect(options);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

        public void push(String topic,String msg){
                try {
                    //此处消息体只需要传入 byte 数组即可,对于其他类型的消息,请自行完成二进制数据的转换
                    final MqttMessage message = new MqttMessage(msg.getBytes());
                    message.setQos(0);
                    /**
                     *消息发送到某个主题 Topic,所有订阅这个 Topic 的设备都能收到这个消息。
                     * 遵循 MQTT 的发布订阅规范,Topic 也可以是多级 Topic。此处设置了发送到二级 Topic
                     */
                    sampleClient.publish(topic, message);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
        }
    }
}

# http客户端

# 原生HttpUrlConnection

  • 实践
package com.automannn.demo.httpClient.urlConnection;

import com.alibaba.fastjson.JSON;
import org.springframework.util.*;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author automannn
 * @Date 2022/4/6
 */
public class HttpUrlConnectionUtil {

    public static String get(String url) {
        Assert.isTrue(!StringUtils.isEmpty(url), "parameter cannot be null!");
        String result = "";
        try {
            URL realURL = new URL(url);
            HttpURLConnection connection = (HttpURLConnection) realURL.openConnection();
            connection.setRequestMethod("GET");
            connection.setConnectTimeout(5 * 1000);
            //xxx: 开启流的时候,会隐式连接
            InputStream inputStream = connection.getInputStream();
            byte[] buf = new byte[inputStream.available()];
            StringBuilder sb = new StringBuilder();
            int length = 0;
            while ((length = inputStream.read(buf)) != -1) {
                String s = new String(buf, StandardCharsets.UTF_8);
                sb.append(s);
            }
            result = sb.toString();

            System.out.println("======连接本身就存在的属性(协议层面)=====");
            System.out.println("=====responseMessage: " + connection.getResponseMessage());
            System.out.println("=====responseCode: " + connection.getResponseCode());
            System.out.println("=====contentType: " + connection.getContentType());
            System.out.println("=====contentEncoding: " + connection.getContentEncoding());
            System.out.println("=====contentLength: " + connection.getContentLength());
            System.out.println("=====lastModified: " + connection.getLastModified());
            System.out.println("=====responseHeaders: " + connection.getHeaderFields());

            //xxx: 关闭流的时候,socket客户端信道就会关闭
            inputStream.close();
            connection.disconnect();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }

    public static String post(String url, Map<String, Object> params) {
        Assert.isTrue(!StringUtils.isEmpty(url), "url cannot be null");
        String result = "";

        try {
            URL realURL = new URL(url);
            HttpURLConnection connection = (HttpURLConnection) realURL.openConnection();

            connection.setRequestMethod("POST");
            connection.setDoOutput(true);
            connection.setDoInput(true);
            connection.setUseCaches(false);
            connection.setConnectTimeout(30000);
            connection.setReadTimeout(30000);
            connection.setRequestProperty("Content-type", "application/json;");

            OutputStream outputStream = connection.getOutputStream();
            String requestContent = "{}";
            if (!CollectionUtils.isEmpty(params)) {
                requestContent = JSON.toJSONString(params);
            }
            outputStream.write(requestContent.getBytes(StandardCharsets.UTF_8));
            outputStream.flush();
            //xxx: 与socket的流信道不同,关闭信道,不会导致socket断开
            outputStream.close();

            InputStream inputStream = connection.getInputStream();
            byte[] buf = new byte[1024];
            int length = 0;
            StringBuilder sb = new StringBuilder();
            while ((length = inputStream.read(buf)) != -1) {
                String temp = new String(buf, StandardCharsets.UTF_8);
                sb.append(temp);
            }
            result = sb.toString();
            inputStream.close();
            connection.disconnect();


        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }

    ;

    public static boolean downloadFile(String url, String location) {
        final AtomicBoolean flag = new AtomicBoolean(false);
        Assert.isTrue(!StringUtils.isEmpty(url), "the parameter url cannot be null!");
        Assert.isTrue(!StringUtils.isEmpty(location), "the parameter location cannot be null!");
        File parent = new File(location);
        if (!parent.exists()) {
            if (!parent.mkdir()) {
                throw new RuntimeException("the location [" + location + "] isn't existed and cannot be created.");
            }
        }
        Thread downLoadThread = new Thread(() -> {
            try {
                URL realURL = new URL(url);
                HttpURLConnection urlConnection = (HttpURLConnection) realURL.openConnection();
                InputStream inputStream = urlConnection.getInputStream();
                //文件名 可以从响应中获取
                String fileName = "testdownload.txt";
                FileOutputStream fileOutputStream = new FileOutputStream(location + File.separator + fileName);
                byte[] buf = new byte[inputStream.available()];
                int len = 0;
                while ((len = inputStream.read(buf)) != -1) {
                    fileOutputStream.write(buf);
                }
                inputStream.close();
                fileOutputStream.close();
                urlConnection.disconnect();
                flag.set(true);
            } catch (Exception e) {
                flag.set(false);
                e.printStackTrace();
            }
        });
        downLoadThread.start();
        //xxx: 同步
        try {
            downLoadThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return flag.get();
    }

    public static String uploadFile(String url,File file){
        Assert.isTrue(!StringUtils.isEmpty(url),"the parameter url: ["+url+"] cannot be empty! ");
        Assert.isTrue(!ObjectUtils.isEmpty(file),"the parameter file: ["+file+"] cannot be empty! ");
        String result = "";
        String boundary = System.currentTimeMillis()+"";
        Map<String,String> params = new HashMap<>();
        params.put("token","theToken");
        try {
            URL realUrl = new URL(url);
            HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection();
            connection.setRequestMethod("POST");
            connection.addRequestProperty("Connection","Keep-Alive");
            connection.addRequestProperty("Charset","UTF-8");
            connection.addRequestProperty("Content-Type","multipart/form-data;boundary="+boundary);

            connection.setDoOutput(true);
            connection.setDoInput(true);

            connection.setUseCaches(false);
            connection.setConnectTimeout(20000);

            DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
            FileInputStream fileInputStream = new FileInputStream(file);

            dataOutputStream.writeBytes("--"+boundary+"\r\n");
            dataOutputStream.writeBytes("Content-Disposition: form-data;name=\"file\"; filename=\""+ URLEncoder.encode(file.getName(),StandardCharsets.UTF_8.toString())+"\"\r\n");
            dataOutputStream.writeBytes("\r\n");

            byte[] buf = new byte[1024];
            while ((fileInputStream.read(buf))!=-1){
                dataOutputStream.write(buf);
            }
            dataOutputStream.writeBytes("\r\n");
            dataOutputStream.writeBytes("--"+boundary+"\r\n");

            //xxx: 写常规参数
            try {
                Set<String > keySet=params.keySet();
                for (String param:keySet){
                    dataOutputStream.writeBytes("Content-Disposition: form-data; name=\""
                            +URLEncoder.encode(param,StandardCharsets.UTF_8.toString())+"\"\r\n");
                    dataOutputStream.writeBytes("\r\n");
                    String value=params.get(param);
                    dataOutputStream.writeBytes(URLEncoder.encode(value,StandardCharsets.UTF_8.toString())+"\r\n");
                    dataOutputStream.writeBytes("--"+boundary+"\r\n");
                }
            }catch (Exception e){

            }

            InputStream inputStream=connection.getInputStream();
            byte[] data=new byte[1024];
            StringBuffer sb1=new StringBuffer();
            int length=0;
            while ((length=inputStream.read(data))!=-1){
                String s=new String(data, StandardCharsets.UTF_8);
                sb1.append(s);
            }
            result=sb1.toString();
            inputStream.close();
            fileInputStream.close();
            dataOutputStream.close();
            connection.disconnect();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }
}
  • 源码 UrlConnection的实现机制核心在于:NetworkClient
package sun.net;
public class NetworkClient {
    /*xxx: socket实例*/
    protected Socket serverSocket;
    /*xxx: io出端*/
    public PrintStream serverOutput;
    /*xxx: io入端*/
    public InputStream serverInput;

    /*xxx: 连接socket */
    protected Socket doConnect(String host, int port) throws IOException, UnknownHostException {
        Socket socket = new Socket();

        socket.connect(new InetSocketAddress(host, port), this.connectTimeout);
        socket.setSoTimeout(this.readTimeout);
        
        return socket;
    }

    public void openServer(String host, int port) throws IOException, UnknownHostException {
        this.serverSocket = this.doConnect(host, port);
        this.serverOutput = new PrintStream(new BufferedOutputStream(this.serverSocket.getOutputStream()), true, encoding);

        this.serverInput = new BufferedInputStream(this.serverSocket.getInputStream());
    }
}

public class HttpClient extends NetworkClient {
    protected HttpClient(URL url, Proxy proxy, int connectionTimeout) throws IOException {
        this.openServer();
    }
}

更多内容可参考文件-URL获取文件

# 开源apache-http-client

  • 实践
package com.automannn.demo.httpClient.httpClient;

import com.alibaba.fastjson.JSON;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.FileEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author automannn
 * @Date 2022/4/6
 */
public class ApacheHttpClientUtil {
    public static String get(String url){
        String result ="";
        Assert.isTrue(!StringUtils.isEmpty(url),"the parameter url: "+ url+" cannot be null.");
        CloseableHttpClient httpClient = HttpClients.createDefault();

        HttpGet request = new HttpGet(url);
        request.addHeader("k1","v1");
        try {
            CloseableHttpResponse response = httpClient.execute(request);
            HttpEntity entity = response.getEntity();
            if (entity!=null){
               result= EntityUtils.toString(entity);
                System.out.println(result);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }

    public static String postForm(String url, Map<String,String> params){
        Assert.isTrue(!StringUtils.isEmpty(url),"the parameter url: "+ url+" cannot be null.");
        Assert.isTrue(!ObjectUtils.isEmpty(params),"the parameter params: "+ params+" cannot be null.");
        String result = "";

        HttpPost request = new HttpPost(url);
        List<NameValuePair> urlParameters = new ArrayList<>();
        for (Map.Entry<String,String> en: params.entrySet()){
            urlParameters.add(new BasicNameValuePair(en.getKey(),en.getValue()));
        }
        try {
            request.setEntity(new UrlEncodedFormEntity(urlParameters));
            CloseableHttpClient httpClient = HttpClients.createDefault();
            CloseableHttpResponse response = httpClient.execute(request);
            result= EntityUtils.toString(response.getEntity());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }

    public static String postJson(String url, Map<String,String> params){
        Assert.isTrue(!StringUtils.isEmpty(url),"the parameter url: "+ url+" cannot be null.");
        Assert.isTrue(!ObjectUtils.isEmpty(params),"the parameter params: "+ params+" cannot be null.");
        String result = "";
        HttpPost request = new HttpPost(url);

        request.addHeader("content-type","application/json");
        try {
            request.setEntity(new StringEntity(JSON.toJSONString(params),ContentType.APPLICATION_JSON));
            CloseableHttpClient httpClient = HttpClients.createDefault();
            CloseableHttpResponse response = httpClient.execute(request);
            result= EntityUtils.toString(response.getEntity());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }

    public static boolean uploadFile(String url, File file){
        Assert.isTrue(!StringUtils.isEmpty(url),"the parameter url cannot be null.");
        Assert.isTrue(!ObjectUtils.isEmpty(file),"the parameter file cannot be null.");
        boolean flag = false;

        HttpPost request = new HttpPost(url);
        request.setEntity(new FileEntity(file));

        CloseableHttpClient httpClient = HttpClients.createDefault();
        try {
            CloseableHttpResponse response = httpClient.execute(request);
            if (response.getStatusLine().getStatusCode()==200){
                flag=true;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        return flag;
    }
}
  • 源码
/*xxx: httpClient工厂*/
public class HttpClients {
    /*xxx: 创建默认的httpClient*/
    public static CloseableHttpClient createDefault() {
        return HttpClientBuilder.create().build();
    }
}

/*xxx: httpClient构造器 */
public class HttpClientBuilder {
    /*xxx: 请求执行器 */
    private HttpRequestExecutor requestExec;

    /*xxx: 连接管理器 */
    private HttpClientConnectionManager connManager;

    /*xxx: 会话保持策略*/
    private ConnectionKeepAliveStrategy keepAliveStrategy;

    /*xxx: cookie存储器*/
    private CookieStore cookieStore;

    public static HttpClientBuilder create() {
        return new HttpClientBuilder();
    }

    public CloseableHttpClient build() {
        //xxx: 根据配置项进行构造,略
    }
}

/*xxx: http请求执行器 */
public class HttpRequestExecutor {
    
    /*xxx: 执行http请求 */
    public HttpResponse execute(
            final HttpRequest request,
            final HttpClientConnection conn,
            final HttpContext context) throws IOException, HttpException {
        HttpResponse response = doSendRequest(request, conn, context);
    }

    /*xxx: 发送请求*/
    protected HttpResponse doSendRequest(
            final HttpRequest request,
            final HttpClientConnection conn,
            final HttpContext context) throws IOException, HttpException {
        conn.sendRequestEntity((HttpEntityEnclosingRequest) request);
    }
}
/*xxx: http连接 */
public interface HttpConnection extends Closeable {
    @Override
        /*xxx: 释放资源*/
    void close() throws IOException;

    /*xxx: 连接是否开启*/
    boolean isOpen();

    /*xxx: 设置超时时间 */
    void setSocketTimeout(int timeout);

    /*xxx: 关闭连接*/
    void shutdown() throws IOException;
}

/*xxx: http客户端连接抽象 */
public interface HttpClientConnection extends HttpConnection {
    /*xxx: 响应是否可达,检查连接活跃性 */
    boolean isResponseAvailable(int timeout)
            throws IOException;

    /*xxx: 发送请求头 */
    void sendRequestHeader(HttpRequest request)
            throws HttpException, IOException;

    /*xxx: 发送请求体*/
    void sendRequestEntity(HttpEntityEnclosingRequest request)
            throws HttpException, IOException;

    /*xxx: 接收响应头*/
    HttpResponse receiveResponseHeader()
            throws HttpException, IOException;

    /*xxx: 接收响应体*/
    void receiveResponseEntity(HttpResponse response)
            throws HttpException, IOException;
    
}
  • 连接的建立流程
/*xxx: http客户端连接管理器 */
public interface HttpClientConnectionManager {
    /*xxx: 连接服务端*/
    void connect(
            HttpClientConnection conn,
            HttpRoute route,
            int connectTimeout,
            HttpContext context) throws IOException;
}

@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public class PoolingHttpClientConnectionManager
        implements HttpClientConnectionManager, ConnPoolControl<HttpRoute>, Closeable {

    /*xxx: 与服务端建立连接 */
    @Override
    public void connect(
            final HttpClientConnection managedConn,
            final HttpRoute route,
            final int connectTimeout,
            final HttpContext context) throws IOException {
        //xxx: 建立连接
        this.connectionOperator.connect(
                conn, host, route.getLocalSocketAddress(), connectTimeout, resolveSocketConfig(host), context);
    }
    
}

/*xxx: http连接操作*/
public interface HttpClientConnectionOperator {

    /*xxx: 连接*/
    void connect(
            ManagedHttpClientConnection conn,
            HttpHost host,
            InetSocketAddress localAddress,
            int connectTimeout,
            SocketConfig socketConfig,
            HttpContext context) throws IOException;

    /*xxx: 协议升级*/
    void upgrade(
            ManagedHttpClientConnection conn,
            HttpHost host,
            HttpContext context) throws IOException;

}

@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
public class DefaultHttpClientConnectionOperator implements HttpClientConnectionOperator {
    @Override
    public void connect(
            final ManagedHttpClientConnection conn,
            final HttpHost host,
            final InetSocketAddress localAddress,
            final int connectTimeout,
            final SocketConfig socketConfig,
            final HttpContext context) throws IOException {
        Socket sock = sf.connectSocket(
                connectTimeout, sock, host, remoteAddress, localAddress, context);
    }
}

/*xxx: socket连接工厂 */
public interface ConnectionSocketFactory {
    /*xxx: 连接远程socket*/
    Socket connectSocket(
            int connectTimeout,
            Socket sock,
            HttpHost host,
            InetSocketAddress remoteAddress,
            InetSocketAddress localAddress,
            HttpContext context) throws IOException;
}

@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class PlainConnectionSocketFactory implements ConnectionSocketFactory {
    @Override
    public Socket connectSocket(
            final int connectTimeout,
            final Socket socket,
            final HttpHost host,
            final InetSocketAddress remoteAddress,
            final InetSocketAddress localAddress,
            final HttpContext context) throws IOException {
        final Socket sock = socket != null ? socket : createSocket(context);
        sock.connect(remoteAddress, connectTimeout);
        return sock;
    }
}

# 开源HttpTemplate

  • 案例
package com.automannn.demo.httpClient.restTemplate;

import com.alibaba.fastjson.JSON;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.HttpOutputMessage;
import org.springframework.http.MediaType;
import org.springframework.http.converter.FormHttpMessageConverter;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.http.converter.HttpMessageNotWritableException;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author automannn
 * @Date 2022/4/6
 */
public class RestTemplateUtil {
    public static String get(String url) {
        Assert.isTrue(!StringUtils.isEmpty(url), "the parameter url: " + url + " cannot be null.");

        RestTemplate restTemplate = new RestTemplate();

        String result = restTemplate.getForObject(url, String.class);

        return result;
    }

    public static String postForm(String url, Map<String, String> params) {
        Assert.isTrue(!StringUtils.isEmpty(url), "the parameter url: " + url + " cannot be null.");
        Assert.isTrue(!ObjectUtils.isEmpty(params), "the parameter params: " + params + " cannot be null.");

        RestTemplate restTemplate = new RestTemplate();

        restTemplate.getMessageConverters().add(new HttpMessageConverter<HashMap<String,String>>() {
            @Override
            public boolean canRead(Class<?> aClass, MediaType mediaType) {
                return false;
            }

            @Override
            public boolean canWrite(Class<?> aClass, MediaType mediaType) {
                return false;
            }

            @Override
            public List<MediaType> getSupportedMediaTypes() {
                return null;
            }

            @Override
            public HashMap<String, String> read(Class<? extends HashMap<String, String>> aClass, HttpInputMessage httpInputMessage) throws IOException, HttpMessageNotReadableException {
                return null;
            }

            @Override
            public void write(HashMap<String, String> stringStringHashMap, MediaType mediaType, HttpOutputMessage httpOutputMessage) throws IOException, HttpMessageNotWritableException {

            }
        });

        String result = restTemplate.postForObject(url, params, String.class);


        return result;
    }

    public static String postJson(String url, Map<String, String> params) {
        Assert.isTrue(!StringUtils.isEmpty(url), "the parameter url: " + url + " cannot be null.");
        Assert.isTrue(!ObjectUtils.isEmpty(params), "the parameter params: " + params + " cannot be null.");

        RestTemplate restTemplate = new RestTemplate();

        String result = restTemplate.postForObject(url, JSON.toJSONString(params), String.class);

        return result;
    }
}
  • 源码
/*xxx: rest操作*/
public interface RestOperations {
    @Nullable
        /*xxx: get获取指定类型的对象*/
    <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException;

    /*xxx: get获取指定响应实体 */
    <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Object... uriVariables)
            throws RestClientException;

    /*xxx: head获取响应头*/
    HttpHeaders headForHeaders(String url, Object... uriVariables) throws RestClientException;

    @Nullable
        /*xxx: post获取指定类型的对象 */
    <T> T postForObject(String url, @Nullable Object request, Class<T> responseType,
                        Object... uriVariables) throws RestClientException;

    /*xxx: post获取指定类型的响应实体 */
    <T> ResponseEntity<T> postForEntity(String url, @Nullable Object request, Class<T> responseType,
                                        Object... uriVariables) throws RestClientException;

    /*xxx: put上传数据 */
    void put(String url, @Nullable Object request, Object... uriVariables) throws RestClientException;

    /*xxx: delete方法移除对象 */
    void delete(String url, Object... uriVariables) throws RestClientException;

    @Nullable
        /*xxx: 直接执行,方法通过参数进行传递*/
    <T> T execute(String url, HttpMethod method, @Nullable RequestCallback requestCallback,
                  @Nullable ResponseExtractor<T> responseExtractor, Object... uriVariables)
            throws RestClientException;
}

/*xxx: http访问器 */
public abstract class HttpAccessor {
    /*xxx: httpClient请求初始化器 */
    private final List<ClientHttpRequestInitializer> clientHttpRequestInitializers = new ArrayList<>();

    /*xxx: 客户端请求工厂,代表底层网络实现方式,默认情况下,用的 原生的 HttpUrlConnection实现 */
    private ClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();

    /*xxx: 创建客户端请求 */
    protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
        ClientHttpRequest request = getRequestFactory().createRequest(url, method);
        initialize(request);
    }

    private void initialize(ClientHttpRequest request) {
        this.clientHttpRequestInitializers.forEach(initializer -> initializer.initialize(request));
    }
}

/*xxx: http请求拦截器 */
public abstract class InterceptingHttpAccessor extends HttpAccessor {
    /*xxx: 请求拦截器列表 */
    private final List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>();

    @Override
    /*xxx: 客户端请求工厂 */
    public ClientHttpRequestFactory getRequestFactory() {
        List<ClientHttpRequestInterceptor> interceptors = getInterceptors();

        /*xxx: 将拦截器,设置到 客户端请求工厂 */
        factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
    }
}

/*xxx: rest模板*/
public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {
    /*xxx: 消息转换器列表 */
    private final List<HttpMessageConverter<?>> messageConverters = new ArrayList<>();

    /*xxx: 异常处理器 */
    private ResponseErrorHandler errorHandler = new DefaultResponseErrorHandler();

    /*xxx:  http请求头部提取器 */
    private final ResponseExtractor<HttpHeaders> headersExtractor = new HeadersExtractor();

    /*xxx: 使用该模板时,可以指定底层网络实现方式 */
    public RestTemplate(ClientHttpRequestFactory requestFactory) {
        this();
        setRequestFactory(requestFactory);
    }

    @Override
    public <T> ResponseEntity<T> postForEntity(String url, @Nullable Object request,
                                               Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException {

        /*xxx: 请求回调 */
        RequestCallback requestCallback = httpEntityCallback(request, responseType);
        /*xxx: 响应提取器*/
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
        return nonNull(execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables));
    }

    @Nullable
    /*xxx: 执行请求 */
    protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
                              @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
        /*xxx: 请求回调处理器,设置请求头部数据格式 */
        requestCallback.doWithRequest(request);

        /*xxx: 执行请求,获取到响应*/
        response = request.execute();
        /*xxx: 处理响应,如果有异常,则进行处理  */
        handleResponse(url, method, response);
        /*xxx: 通过响应提取器,提取结果 */
        return (responseExtractor != null ? responseExtractor.extractData(response) : null);
    }
}
/*xxx: 客户端请求 */
public interface ClientHttpRequest extends HttpRequest, HttpOutputMessage {
    /*xxx: 执行请求 */
    ClientHttpResponse execute() throws IOException;
}

/*xxx: 抽象http客户端 */
public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
    @Override
    public final ClientHttpResponse execute() throws IOException {
        assertNotExecuted();
        /*xxx: 执行请求 */
        ClientHttpResponse result = executeInternal(this.headers);
        this.executed = true;
        return result;
    }

    /*xxx: 执行请求 */
    protected abstract ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException;
}

/*xxx: 默认的客户端实现 */
final class SimpleStreamingClientHttpRequest extends AbstractClientHttpRequest {
    private final HttpURLConnection connection;

    @Override
    protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
        this.connection.connect();
        this.connection.getResponseCode();

        return new SimpleClientHttpResponse(this.connection);
    }
}
@Deprecated
/*xxx: netty4实现的异步客户端,由 工厂类 Netty4ClientHttpRequestFactory实现*/
	/*xxx: spring5由 ReactorClientHttpConnector实现 响应式开发,该功能废弃 */
class Netty4ClientHttpRequest extends AbstractAsyncClientHttpRequest implements ClientHttpRequest {
    @Override
    public ClientHttpResponse execute() throws IOException {
        return executeAsync().get();
    }

    @Override
    /*xxx: 执行请求 */
    protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers) throws IOException {
        this.bootstrap.connect(this.uri.getHost(), getPort(this.uri)).addListener(connectionListener);
    }
}

此外,客户端还可由 okhttp3,apache-httpclient提供