# 网络编程
# 概述
- 对信息的发送与接收
- 使用套接字来达到进程间通信
# 发展
- 1969年诞生于美国国防部的APRAnet,标志现代计算机网络诞生;
- 硬件+协议,是网络通信的基础.(参考计算机网络);
- 硬件:各种传输介质,网线,光纤,无线等;
- 协议:OSI七层协议->TCP/IP四层协议
# java网络编程
# BIO源码
- 服务端
public
class ServerSocket implements java.io.Closeable {
private SocketImpl impl;
public ServerSocket() throws IOException {
setImpl();
}
public void bind(SocketAddress endpoint, int backlog) throws IOException {
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
}
private void setImpl() {
impl = factory.createSocketImpl();
}
public Socket accept() throws IOException {
Socket s = new Socket((SocketImpl) null);
implAccept(s);
}
protected final void implAccept(Socket s) throws IOException {
SocketImpl si = null;
getImpl().accept(si);
}
}
class DualStackPlainSocketImpl extends AbstractPlainSocketImpl
{
static native int socket0(boolean stream, boolean v6Only) throws IOException;
static native void bind0(int fd, InetAddress localAddress, int localport,
boolean exclBind)
throws IOException;
static native int connect0(int fd, InetAddress remote, int remotePort)
throws IOException;
static native void listen0(int fd, int backlog) throws IOException;
static native int accept0(int fd, InetSocketAddress[] isaa) throws IOException;
void socketAccept(SocketImpl s) throws IOException {
waitForNewConnection(nativefd, timeout);
newfd = accept0(nativefd, isaa);
}
}
- 客户端
public
class Socket implements java.io.Closeable {
SocketImpl impl;
private Socket(SocketAddress address, SocketAddress localAddr,
boolean stream) throws IOException {
setImpl();
bind(localAddr);
connect(address);
}
public void connect(SocketAddress endpoint, int timeout) throws IOException {
impl.connect(addr, port);
}
}
abstract class AbstractPlainSocketImpl extends SocketImpl
{
abstract void socketConnect(InetAddress address, int port, int timeout)
throws IOException;
}
class DualStackPlainSocketImpl extends AbstractPlainSocketImpl
{
connectResult = connect0(nativefd, address, port);
}
# BIO实践
package com.automannn.demo;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author automannn
* @Date 2022/3/25
*/
public class SocketTest {
public static void main(String[] args) throws IOException {
Thread t1 = new Thread(()->{
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(10001);
/*xxx: 本质上是手动创建了一个本机的socket,并将底层接收到的socket资源句柄,设置给它*/
Socket serverProxy= serverSocket.accept();
InputStream inputStream= serverProxy.getInputStream();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
byte[] target = new byte[4];
if ( inputStream.read(target)!=-1){
outputStream.write(target);
}
System.out.println(new String(outputStream.toByteArray()));
serverProxy.getOutputStream().write("pong".getBytes());
serverProxy.getOutputStream().flush();
serverProxy.shutdownOutput();
System.out.println("服务端接收消息,发送消息->应用层面会话结束");
} catch (IOException e) {
e.printStackTrace();
}
});
t1.start();
Thread t2 = new Thread(()->{
try {
Socket socket =new Socket("localhost",10001);
socket.getOutputStream().write("ping".getBytes());
socket.getOutputStream().flush();
socket.shutdownOutput();
InputStream inputStream = socket.getInputStream();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
byte[] target = new byte[4];
if ( inputStream.read(target)!=-1){
outputStream.write(target);
}
System.out.println(new String(outputStream.toByteArray()));
System.out.println("客户端发送消息->接收消息->应用层面会话结束");
} catch(IOException e){
e.printStackTrace();
}
});
t2.start();
}
}
# 注意事项
- 每个socket都有一对输入输出流,读取时会进行阻塞,除非读到EOF,EOF可以通过
shutdownInput
或者shutdownOutpu
更改该状态 - 关闭流,重置流,或者关闭socket时,也会变更EOF状态;
shutdownInput
,shutdownOutput
作用是单向关闭流,不会导致socket连接断开;- 输入流,输出流与socket具有同等生命周期,关闭流,相当于关闭socket;
- 输入输出流,涉及到粘包与半包,这只是通信层面
- socket只是解决通信问题,连接一旦建立,就一直通信
# NIO源码
- 服务端
/*xxx: 信道*/
public interface Channel extends Closeable {
public boolean isOpen();
}
public interface NetworkChannel
extends Channel
{
NetworkChannel bind(SocketAddress local) throws IOException;
<T> T getOption(SocketOption<T> name) throws IOException;
}
public abstract class SelectableChannel
extends AbstractInterruptibleChannel
implements Channel
{
public abstract SelectorProvider provider();
}
public abstract class AbstractSelectableChannel
extends SelectableChannel
{
private final SelectorProvider provider;
protected AbstractSelectableChannel(SelectorProvider provider) {
this.provider = provider;
}
protected abstract void implCloseSelectableChannel() throws IOException;
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
SelectionKey k = findKey(sel);
k.interestOps(ops);
k.attach(att);
return k;
}
}
public abstract class ServerSocketChannel
extends AbstractSelectableChannel
implements NetworkChannel
{
protected ServerSocketChannel(SelectorProvider provider) {
super(provider);
}
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
public abstract ServerSocketChannel bind(SocketAddress local, int backlog)
throws IOException;
public abstract ServerSocket socket();
public abstract SocketChannel accept() throws IOException;
}
class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl {
public ServerSocket socket() {
this.socket = ServerSocketAdaptor.create(this);
return this.socket;
}
}
public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {
InetSocketAddress var4 = var1 == null ? new InetSocketAddress(0) : Net.checkAddress(var1);
Net.bind(this.fd, var4.getAddress(), var4.getPort());
Net.listen(this.fd, var2 < 1 ? 50 : var2);
}
public SocketChannel accept() throws IOException {
FileDescriptor var4 = new FileDescriptor();
InetSocketAddress var6;
SocketChannelImpl var2 = new SocketChannelImpl(this.provider(), var4, var6);
return var2;
}
}
public abstract class Selector implements Closeable {
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
public abstract int select(long timeout)
throws IOException;
public abstract Set<SelectionKey> keys();
}
final class WindowsSelectorImpl extends SelectorImpl {
protected int doSelect(long var1) throws IOException {
int var3 = this.updateSelectedKeys();
return var3;
}
private int updateSelectedKeys() {
++this.updateCount;
byte var1 = 0;
int var4 = var1 + this.subSelector.processSelectedKeys(this.updateCount);
WindowsSelectorImpl.SelectThread var3;
for(Iterator var2 = this.threads.iterator(); var2.hasNext(); var4 += var3.subSelector.processSelectedKeys(this.updateCount)) {
var3 = (WindowsSelectorImpl.SelectThread)var2.next();
}
return var4;
}
}
- 客户端
public abstract class SocketChannel
extends AbstractSelectableChannel
implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel
{
public abstract boolean connect(SocketAddress remote) throws IOException;
public abstract long write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
public abstract long read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
}
class SocketChannelImpl extends SocketChannel implements SelChImpl {
public boolean connect(SocketAddress var1) throws IOException {
int var8 = 0;
InetSocketAddress var5 = Net.checkAddress(var1);
InetAddress var9 = var5.getAddress();
var8 = Net.connect(this.fd, var9, var5.getPort());
}
public int read(ByteBuffer var1) throws IOException {
int var3 = 0;
var3 = IOUtil.read(this.fd, var1, -1L, nd);
}
public int write(ByteBuffer var1) throws IOException {
int var3 = 0;
var3 = IOUtil.write(this.fd, var1, -1L, nd);
}
}
# NIO实践
public class NioStartTest {
public static void main(String[] args) throws IOException, InterruptedException {
new Thread(new NioServer()).start();
Thread.sleep(5000);
System.out.println("服务端启动成功,启动客户端");
new Thread(new NioClient()).start();
}
}
public static class NioServer implements Runnable{
private Selector selector;
private ServerSocketChannel acceptorChannel;
private volatile boolean stop;
public NioServer() throws IOException {
//监听客户端连接
acceptorChannel = ServerSocketChannel.open();
//绑定监听端口
acceptorChannel.bind(new InetSocketAddress(InetAddress.getByName("127.0.0.1"),8055));
//并设置为非阻塞模式
acceptorChannel.configureBlocking(false);
//创建选择器
selector= Selector.open();
acceptorChannel.register(selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
while (!stop){
selector.select(1000);
Set selectedKeys = selector.selectedKeys();
Iterator iterator = selectedKeys.iterator();
SelectionKey key = null;
while (iterator.hasNext()){
key = (SelectionKey) iterator.next();
iterator.remove();
handleInput(key);
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
//应答
doWrite(sc, currentTime);
}
}
}
private void doWrite(SocketChannel channel, String responseStr)
throws IOException {
if (responseStr != null && responseStr.trim().length() > 0) {
byte[] bytes = responseStr.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
public static class NioClient implements Runnable{
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public NioClient(){
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
}
@Override
public void run() {
doConnect();
while (!stop) {
selector.select(1000);
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = (SelectionKey) it.next();
it.remove();
handleInput(key);
}
}
}
private void doConnect() throws IOException {
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
if (socketChannel.connect(new InetSocketAddress("127.0.0.1", 8055))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
}
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
}
}
}
private void doWrite(SocketChannel sc) throws IOException {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining())
System.out.println("Send order 2 server succeed.");
}
}
# 注意事项
- 不管是
服务端
还是客户端
,都引入了Selector这个概念,也是NIO的核心 - Selector来处理读写事件,并且可以处理多个socket的读写事件
- 可以以非阻塞的方式读取,在
BIO
中,除非读到了EOF状态,否则会处于阻塞状态;
# Netty网络编程
# 关键接口源码
- 事件环组-事件环-事件执行器
/*xxx: 事件执行器组 */
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
/*xxx: 优雅关闭 */
Future<?> shutdownGracefully();
/*xxx: 下一个事件执行器*/
EventExecutor next();
@Override
/*xxx: 提交任务*/
<T> Future<T> submit(Callable<T> task);
@Override
/*xxx: 调度任务 */
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
}
/*xxx: 事件环分组*/
public interface EventLoopGroup extends EventExecutorGroup {
/*xxx: 在当前事件环中,注册一个信道*/
ChannelFuture register(Channel channel);
@Override
/*xxx: 下一个 事件环*/
EventLoop next();
}
/*xxx: 事件执行器 */
public interface EventExecutor extends EventExecutorGroup {
/*xxx: 探测当前某个线程 是否正在执行 事件环*/
boolean inEventLoop(Thread thread);
/*xxx: 返回异步结果*/
<V> Promise<V> newPromise();
}
/*xxx: 抽象事件执行器 */
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
@Override
public EventExecutor next() {
return this;
}
}
/*xxx: 抽象事件环 */
public abstract class AbstractEventLoop extends AbstractEventExecutor implements EventLoop {
}
/*xxx: 单线程-事件环执行器*/
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
}
/*xxx: 多线程事件环执行器*/
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
}
/*xxx: NIO 事件环*/
public final class NioEventLoop extends SingleThreadEventLoop {
private Selector unwrappedSelector;
Selector unwrappedSelector() {
return unwrappedSelector;
}
}
- 信道
/*xxx: 信道 */
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
/*xxx: 绑定的事件环 */
EventLoop eventLoop();
/*xxx: 当前信道 是否与特定的 事件环 相绑定 */
boolean isRegistered();
/*xxx: 信道管道 */
ChannelPipeline pipeline();
}
/*xxx: 抽象信道 */
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
/*xxx: 是否与特定事件环绑定 */
private volatile boolean registered;
@Override
public boolean isRegistered() {
return registered;
}
private void register0(ChannelPromise promise) {
doRegister();
registered = true;
}
protected void doRegister() throws Exception {
}
}
/*xxx: 抽象NIO信道 */
public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doRegister() throws Exception {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
}
}
- 信道管道
/*xxx: 信道管道 */
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
/*xxx: 管道头部追加 信道处理器 */
ChannelPipeline addFirst(String name, ChannelHandler handler);
/*xxx: 管道尾部追加信道处理器 */
ChannelPipeline addLast(String name, ChannelHandler handler);
/*xxx: 管道上,某个信道处理器前,追加一个信道处理器 */
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
/*xxx: 管道上,某个信道处理器后,追加一个信道处理器 */
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
}
- 信道处理器
/*xxx: 信道处理器 */
public interface ChannelHandler {
/*xxx: 事件追加回调*/
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
/*xxx: 事件移除回调 */
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
@Deprecated
/*xxx: 异常捕获事件 */
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
/*xxx: io出站 事件处理*/
public interface ChannelOutboundHandler extends ChannelHandler {
/*xxx: 绑定事件 */
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
/*xxx: 连接事件 */
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
/*xxx: 断开连接事件 */
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/*xxx: 套接字关闭事件 */
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/*xxx: 取消信道注册事件 */
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/*xxx: 读取事件 */
void read(ChannelHandlerContext ctx) throws Exception;
/*xxx: 写入事件 */
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
/*xxx: 刷新事件*/
void flush(ChannelHandlerContext ctx) throws Exception;
}
/*xxx: io入站 事件处理, 添加了一些状态变化回调,方便用户对状态变化做出反应*/
public interface ChannelInboundHandler extends ChannelHandler {
/*xxx: 信道注册事件 */
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/*xxx: 信道取消注册事件 */
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/*xxx: 信道激活事件 */
void channelActive(ChannelHandlerContext ctx) throws Exception;
/*xxx: 信道取消激活事件 */
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/*xxx: 信道读事件 */
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/*xxx: 信道读取完成事件 */
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
/*xxx: 用户事件触发事件 */
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
}
/*xxx: 信道处理器适配器 */
public abstract class ChannelHandlerAdapter implements ChannelHandler {
/*xxx: 处理器是否共享*/
public boolean isSharable() {
//略
}
}
- 启动器
/*xxx: 抽象启动器 */
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private volatile ChannelFactory<? extends C> channelFactory;
/*xxx: 绑定事件环 */
public B group(EventLoopGroup group) {
this.group = group;
}
/*xxx: 绑定信道 */
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
/*xxx: 绑定信道处理器 */
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
}
# 通信实践
- 服务端
package com.automannn.demo.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* @author automannn
* @Date 2022/3/28
*/
public class NettyServerTest {
public static void main(String[] args) {
NettyServer nettyServer = new NettyServer(8701);
}
static class NettyServer{
private int port;
public NettyServer(int port){
this.port = port;
this.bind();
}
private void bind(){
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG,1024);//连接数
bootstrap.option(ChannelOption.TCP_NODELAY,true);//不延迟,消息立即发送
bootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);//长连接
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new NettyServerHandler());
}
});
ChannelFuture future= bootstrap.bind(port).sync();
if(future.isSuccess()){
System.out.println("启动Netty服务成功,端口号:"+this.port);
}
future.channel().closeFuture().sync();
}catch (Exception e){
System.out.println("启动Netty服务异常,异常信息:"+e.getMessage());
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
static class NettyServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
String received = getMessage(buf);
System.out.println("服务器接收到消息:"+received);
//应答
ctx.writeAndFlush(getSendByteBuf("automannn"));
}
private Object getSendByteBuf(String message) {
byte[] req = message.getBytes(StandardCharsets.UTF_8);
ByteBuf pingMessage = Unpooled.buffer();
pingMessage.writeBytes(req);
return pingMessage;
}
private String getMessage(ByteBuf buf) {
byte[] con = new byte[buf.readableBytes()];
buf.readBytes(con);
return new String(con, StandardCharsets.UTF_8);
}
}
}
- 客户端
package com.automannn.demo.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.StandardCharsets;
/**
* @author automannn
* @Date 2022/3/28
*/
public class NettyClientTest {
public static void main(String[] args) {
NettyClient client = new NettyClient(8701,"localhost");
}
static class NettyClient {
private int port;
private String host;
public NettyClient(int port, String host) {
this.port = port;
this.host = host;
start();
}
private void start() {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host, port);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
if (future.isSuccess()) {
System.out.println("=======connect server success=======");
}
future.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
static class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
byte[] data = "hello".getBytes(StandardCharsets.UTF_8);
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeBytes(data);
ctx.writeAndFlush(byteBuf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
String rev = getMessage(buf);
System.out.println("客户端收到服务器数据: " + rev);
}
private String getMessage(ByteBuf message) {
byte[] con = new byte[message.readableBytes()];
message.readBytes(con);
return new String(con, StandardCharsets.UTF_8);
}
}
}
# http协议实现
# 源码
- http协议解码-编码器
/*xxx: 全双工信道处理器 */
/*xxx: 所有的操作均是委派给 上下文操作 */
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
}
/*xxx: 进出站 混合双工处理器 */
public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
extends ChannelDuplexHandler {
/*xxx: IO入站 事件处理器 */
private I inboundHandler;
/*xxx: IO出战 事件处理器*/
private O outboundHandler;
}
/*xxx: http服务端解码器 */
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements HttpServerUpgradeHandler.SourceCodec {
}
/*xxx: 端对端消息编码器*/
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {
@Override
/*xxx: 写消息*/
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
CodecOutputList out = CodecOutputList.newInstance();
I cast = (I) msg;
/*xxx: 编码写入*/
encode(ctx, cast, out);
/*xxx: 写入消息体*/
ctx.write(out.getUnsafe(0), promise);
}
/*xxx: 编码写入*/
protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}
/*xxx: http对象编码器 */
public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageToMessageEncoder<Object> {
@Override
/*xxx: 编码 */
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
/*xxx: 写入头部信息*/
ctx.alloc().buffer(int);
/*xxx: 追加消息体*/
out.add(obj);
}
}
/*xxx: http响应编码器*/
public class HttpResponseEncoder extends HttpObjectEncoder<HttpResponse> {
}
/*xxx: 字节转化为消息 解码器 */
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
/*xxx: 解码 */
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
}
/*xxx: http 对象解码器 */
public abstract class HttpObjectDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
out.add(obj);
}
}
/*xxx: http请求解码器 */
public class HttpRequestDecoder extends HttpObjectDecoder {
}
# 实践
package com.automannn.netty.test;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
/**
* @author automannn
* @Date 2022/4/6
*/
public class HttpServerTest {
public static void main(String[] args) throws Exception {
HttpServer server = new HttpServer(8081);
server.start();
}
static class HttpServer{
private int port;
public HttpServer(int port){
this.port = port;
}
public void start() throws Exception{
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
bootstrap.group(boss,work)
.handler(new LoggingHandler(LogLevel.DEBUG))
.channel(NioServerSocketChannel.class)
.childHandler(new HttpServerInitializer());
ChannelFuture future = bootstrap.bind(new InetSocketAddress(port)).sync();
System.out.println("http server start up on port: "+ port);
future.channel().closeFuture().sync();
}
}
static class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast("httpAggregator",new HttpObjectAggregator(512*1024));//http消息聚合器
pipeline.addLast(new HttpRequestHandler());
}
}
static class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
/* 100 Continue含义
HTTP客户端程序有一个实体的主体部分要发送给服务器,但希望在发送之前查看下服务器是否会接受这个实体,
所以在发送实体之前先发送了一个携带100 Continue的Expect请求首部的请求。
服务器在收到这样的请求后,应该用 100 Continue或一条错误码来进行响应。
*/
if(HttpUtil.is100ContinueExpected(fullHttpRequest)){
channelHandlerContext.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.CONTINUE));
}
ByteBuf reqContentByteBuf = fullHttpRequest.content();
byte[] reqContent = new byte[reqContentByteBuf.readableBytes()];
reqContentByteBuf.readBytes(reqContent);
System.out.println("=====客户端请求体=====");
System.out.println("=====uri====: "+fullHttpRequest.uri());
System.out.println("=====headers====: "+fullHttpRequest.headers());
System.out.println("=====request====: "+fullHttpRequest);
System.out.println("=====content====: "+new String(reqContent,StandardCharsets.UTF_8));
String uri = fullHttpRequest.uri();
String msg = "<html><head><title>test</title></head><body>你请求uri为:" + uri+"</body></html>";
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK, Unpooled.copiedBuffer(msg, StandardCharsets.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=UTF-8");
channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
}
# mqtt协议实践
# mqtt协议
- 概述 MQTT(Message Queuing Telemetry Transport),基于发布/订阅范式的消息协议。
工作在TCP/IP协议族上,为硬件性能低下的远程设备,以及网络状况糟糕环境下,设计的发布/订阅型消息协议。
它需要一个消息中间件,主要应用于机器与机器通信以及物联网。
由C/S架构组成,其中,C端分为两类角色,发布者或者订阅者; 消息分为主题(Topic)和 负载(payload)两类.
# 源码
MqttDecoder TODO
MqttEncoder TODO
# 实践
- 服务端
package com.automannn.demo.mqtt;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.util.CollectionUtils;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
/**
* @author automannn
* @Date 2022/4/10
*/
@Slf4j
public class ServerTest {
public static void main(String[] args) {
MqttServer server = new MqttServer(8080);
server.start();
}
static class MqttServer{
protected ConcurrentHashMap<String, Channel> channels = new ConcurrentHashMap<>();
private int port;
public MqttServer(int port){
this.port = port;
}
public void start() {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
bootstrap.group(boss,work)
.handler(new LoggingHandler(LogLevel.DEBUG))
.channel(NioServerSocketChannel.class)
.childHandler(new MqttServerInitializer(this));
ChannelFuture future = null;
try {
future = bootstrap.bind(new InetSocketAddress(port)).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("mqtt server start up on port: "+ port);
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public ConcurrentHashMap<String, Channel> getChannels() {
return channels;
}
}
static class MqttServerInitializer extends ChannelInitializer<SocketChannel> {
private MqttServer server;
public MqttServerInitializer(MqttServer server) {
this.server = server;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast(new MqttDecoder());
channelPipeline.addLast(MqttEncoder.INSTANCE);
List<EventListener> listeners = new ArrayList<>();
SubscribeService service = new MemorySubscribeService();
//xxx: 添加监听器
listeners.add(new DefaultMqttServerMessageEventListener(service,server));
MqttEventDispatcher dispatcher = new MqttEventDispatcher(listeners);
MqttServerDispatchHandler handler = new MqttServerDispatchHandler(dispatcher,server);
//xxx: 添加事件分发器
channelPipeline.addLast(handler);
}
}
static class MqttServerDispatchHandler extends ChannelInboundHandlerAdapter{
private MqttEventDispatcher dispatcher;
private MqttServer mqttServer;
public MqttServerDispatchHandler(MqttEventDispatcher dispatcher,MqttServer server) {
this.dispatcher = dispatcher;
this.mqttServer = server;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String channelId = ctx.channel().id().asShortText();
log.debug("Message received from channel '{} : {}'.",channelId,msg);
dispatcher.dispatchMessageEvent(ctx,ctx.channel(),msg);
super.channelRead(ctx,msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String channelId = ctx.channel().id().asShortText();
mqttServer.getChannels().put(channelId,ctx.channel());
super.channelActive(ctx);
}
}
static class MqttEventDispatcher{
private List<EventListener> listeners;
public MqttEventDispatcher(List<EventListener> listeners){
this.listeners = listeners;
}
public void dispatchMessageEvent(ChannelHandlerContext ctx,Channel channel,Object msg){
for (EventListener listener : listeners) {
if(listener instanceof MqttMessageEventListener){
((MqttMessageEventListener)listener).channelRead(ctx,channel,msg);
}
}
}
}
static interface MqttMessageEventListener extends EventListener{
void channelRead(ChannelHandlerContext ctx,Channel channel,Object msg);
}
static abstract class AbstractMqttMessageEventListener implements MqttMessageEventListener{
@Override
public void channelRead(ChannelHandlerContext ctx, Channel channel, Object msg) {
if(msg instanceof MqttMessage){
MqttMessage message = (MqttMessage) msg;
MqttMessageType messageType = message.fixedHeader().messageType();
switch (messageType) {
case CONNECT:
this.connect(channel, (MqttConnectMessage) message);
break;
case PUBLISH:
this.publish(channel, (MqttPublishMessage) message);
break;
case SUBSCRIBE:
this.subscribe(channel, (MqttSubscribeMessage) message);
break;
case UNSUBSCRIBE:
this.unSubscribe(channel, (MqttUnsubscribeMessage) message);
break;
case PINGREQ:
this.pingReq(channel, message);
break;
case DISCONNECT:
this.disConnect(channel, message);
break;
default:
log.debug("Nonsupport server message type of '{}'.", messageType);
break;
}
}
}
public void connect(Channel channel, MqttConnectMessage msg) {
MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(
MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, true), null);
channel.writeAndFlush(okResp);
}
public void pingReq(Channel channel, MqttMessage msg) {
MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false,
MqttQoS.AT_MOST_ONCE, false, 0));
channel.writeAndFlush(pingResp);
}
public void disConnect(Channel channel, MqttMessage msg) {
if (channel.isActive()) {
channel.close();
}
}
public void subscribe(Channel channel, MqttSubscribeMessage msg) {
MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
new MqttSubAckPayload(0));
channel.writeAndFlush(subAckMessage);
}
public void unSubscribe(Channel channel, MqttUnsubscribeMessage msg) {
MqttUnsubAckMessage unSubAckMessage = (MqttUnsubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), null);
channel.writeAndFlush(unSubAckMessage);
}
public void publish(Channel channel, MqttPublishMessage msg) {
}
}
static interface SubscribeService{
void subscribe(String clientId,String channelId,String topic);
void unSubscribe(String clientId);
void unSubscribe(String channelId,String topic);
List<String> getChannelByTopic(String topic);
}
static class MemorySubscribeService implements SubscribeService{
private static ConcurrentHashMap<String, Map<String,String>> topicGroup = new ConcurrentHashMap<>();
private ConcurrentLinkedDeque<String> removedClients = new ConcurrentLinkedDeque<String>();
@Override
public void subscribe(String clientId, String channelId, String topic) {
if(topicGroup.containsKey(topic)){
Map<String,String> group= topicGroup.get(topic);
group.put(channelId,clientId);
}else{
Map<String,String> infos = new HashMap<>();
infos.put(channelId,clientId);
topicGroup.put(topic,infos);
}
}
@Override
public void unSubscribe(String clientId) {
//剔除客户端,可能剔除多个绑定的主题
removedClients.add(clientId);
}
@Override
public void unSubscribe(String channelId,String topic) {
if (topicGroup.containsKey(topic)){
Map<String,String> infos = topicGroup.get(topic);
if (infos.containsKey(channelId)){
infos.remove(channelId);
}
}
}
@Override
public List<String> getChannelByTopic(String topic) {
if(!topicGroup.containsKey(topic)){
return Collections.emptyList();
}else{
Map<String,String> infos = topicGroup.get(topic);
List<String> result = new ArrayList<>();
Iterator<Map.Entry<String, String>> iterator = infos.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry<String,String> entry = iterator.next();
//剔除无效的clientId
if(removedClients.contains(entry.getValue())){
removedClients.remove(entry.getValue());
iterator.remove();
continue;
}
result.add(entry.getKey());
}
return result;
}
}
}
static class DefaultMqttServerMessageEventListener extends AbstractMqttMessageEventListener{
private SubscribeService subscribeService;
private MqttServer mqttServer;
public DefaultMqttServerMessageEventListener(SubscribeService subscribeService,MqttServer server) {
this.subscribeService = subscribeService;
this.mqttServer = server;
}
@Override
public void connect(Channel channel, MqttConnectMessage msg) {
String clientId = msg.payload().clientIdentifier();
channel.attr(AttributeKey.valueOf("clientId")).set(clientId);
super.connect(channel, msg);
}
@Override
public void disConnect(Channel channel, MqttMessage msg) {
String channelId = channel.id().asShortText();
try {
subscribeService.unSubscribe(channelId);
log.debug("取消订阅全部主题成功. channelId={}", channelId);
} catch (Exception ex) {
log.error("取消订阅失败.", ex);
}
super.disConnect(channel, msg);
}
@Override
public void subscribe(Channel channel, MqttSubscribeMessage msg) {
List<MqttTopicSubscription> topicSubscriptions = msg.payload().topicSubscriptions();
String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
String channelId = channel.id().asShortText();
topicSubscriptions.forEach(topicSubscription -> {
String topic = topicSubscription.topicName();
MqttQoS mqttQoS = topicSubscription.qualityOfService();
log.debug("开始订阅. clientId={}, topic={}, qos={}", clientId, topic, mqttQoS.value());
try {
subscribeService.subscribe(clientId, channelId, topic);
log.debug("订阅主题成功. clientId={}, channelId={}, topic={}", channelId, channelId, topic);
} catch (Exception ex) {
log.error("订阅失败.", ex);
}
});
super.subscribe(channel, msg);
}
@Override
public void unSubscribe(Channel channel, MqttUnsubscribeMessage msg) {
String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
List<String> topics = msg.payload().topics();
topics.forEach(topic -> {
try {
subscribeService.unSubscribe(clientId, topic);
log.debug("取消订阅主题成功. clientId={}, topic={}", clientId, topic);
} catch (Exception ex) {
log.error("取消订阅失败.", ex);
}
});
super.unSubscribe(channel, msg);
}
@Override
public void publish(Channel channel, MqttPublishMessage msg) {
String topic = msg.variableHeader().topicName();
List<String> channels= subscribeService.getChannelByTopic(topic);
if (!CollectionUtils.isEmpty(channels)){
for (String channelId : channels) {
Channel clientChannel = mqttServer.getChannels().get(channelId);
if (clientChannel!=null&& clientChannel.isActive()){
ByteBuf buf = msg.content().duplicate();
byte[] tmp = new byte[buf.readableBytes()];
buf.readBytes(tmp);
String content = new String(tmp);
MqttPublishMessage sendMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttPublishVariableHeader(topic, 0),
Unpooled.buffer().writeBytes(new String(content.toUpperCase()).getBytes()));
clientChannel.writeAndFlush(sendMessage);
}
}
}
super.publish(channel, msg);
}
}
}
- 客户端
package com.automannn.demo.mqtt;
import com.sun.javafx.binding.StringFormatter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* @author automannn
* @Date 2022/4/10
*/
@Slf4j
public class ClientTest {
public static void main(String[] args) {
String broker = "tcp://127.0.0.1:8080";
String subScribeT1 = "automannn/t1";
String subScribeT2 = "automannn/t2";
String clientId1 = "customClient1";
String clientId2 = "customClient2";
String clientId3 = "customClient3";
String clientId4 = "customClient4";
String clientId5 = "customClient5";
MqttCustomClient client1 = new MqttCustomClient(broker,clientId1,subScribeT1);
MqttCustomClient client2 = new MqttCustomClient(broker,clientId2,subScribeT1);
MqttCustomClient client3 = new MqttCustomClient(broker,clientId3,subScribeT2);
MqttCustomClient client4 = new MqttCustomClient(broker,clientId4,subScribeT2);
MqttCustomClient client5 = new MqttCustomClient(broker,clientId5,subScribeT2);
client1.start();
client2.start();
client3.start();
client4.start();
client5.start();
client1.push(subScribeT1,"test,mytopic111111");
client1.push(subScribeT2,"test,mytopic222222");
}
static class MqttCustomClient {
final private String broker;
final private String clientId;
final private String topic;
private MqttClient sampleClient;
MemoryPersistence persistence;
public MqttCustomClient(String broker, String clientId, String topic) {
this.broker = broker;
this.clientId = clientId;
this.topic = topic;
}
public void start(){
try {
persistence = new MemoryPersistence();
sampleClient = new MqttClient(broker,clientId,persistence);
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{broker});
options.setCleanSession(true);
options.setKeepAliveInterval(90);
options.setAutomaticReconnect(true);
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
sampleClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean b, String s) {
try {
sampleClient.subscribe(topic,0);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
log.debug("message arrived. topic={}, message={}.", topic, new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.debug("delivery complete. messageId={}.", iMqttDeliveryToken.getMessageId());
}
});
sampleClient.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void push(String topic,String msg){
try {
//此处消息体只需要传入 byte 数组即可,对于其他类型的消息,请自行完成二进制数据的转换
final MqttMessage message = new MqttMessage(msg.getBytes());
message.setQos(0);
/**
*消息发送到某个主题 Topic,所有订阅这个 Topic 的设备都能收到这个消息。
* 遵循 MQTT 的发布订阅规范,Topic 也可以是多级 Topic。此处设置了发送到二级 Topic
*/
sampleClient.publish(topic, message);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
# http客户端
# 原生HttpUrlConnection
- 实践
package com.automannn.demo.httpClient.urlConnection;
import com.alibaba.fastjson.JSON;
import org.springframework.util.*;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author automannn
* @Date 2022/4/6
*/
public class HttpUrlConnectionUtil {
public static String get(String url) {
Assert.isTrue(!StringUtils.isEmpty(url), "parameter cannot be null!");
String result = "";
try {
URL realURL = new URL(url);
HttpURLConnection connection = (HttpURLConnection) realURL.openConnection();
connection.setRequestMethod("GET");
connection.setConnectTimeout(5 * 1000);
//xxx: 开启流的时候,会隐式连接
InputStream inputStream = connection.getInputStream();
byte[] buf = new byte[inputStream.available()];
StringBuilder sb = new StringBuilder();
int length = 0;
while ((length = inputStream.read(buf)) != -1) {
String s = new String(buf, StandardCharsets.UTF_8);
sb.append(s);
}
result = sb.toString();
System.out.println("======连接本身就存在的属性(协议层面)=====");
System.out.println("=====responseMessage: " + connection.getResponseMessage());
System.out.println("=====responseCode: " + connection.getResponseCode());
System.out.println("=====contentType: " + connection.getContentType());
System.out.println("=====contentEncoding: " + connection.getContentEncoding());
System.out.println("=====contentLength: " + connection.getContentLength());
System.out.println("=====lastModified: " + connection.getLastModified());
System.out.println("=====responseHeaders: " + connection.getHeaderFields());
//xxx: 关闭流的时候,socket客户端信道就会关闭
inputStream.close();
connection.disconnect();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
public static String post(String url, Map<String, Object> params) {
Assert.isTrue(!StringUtils.isEmpty(url), "url cannot be null");
String result = "";
try {
URL realURL = new URL(url);
HttpURLConnection connection = (HttpURLConnection) realURL.openConnection();
connection.setRequestMethod("POST");
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setConnectTimeout(30000);
connection.setReadTimeout(30000);
connection.setRequestProperty("Content-type", "application/json;");
OutputStream outputStream = connection.getOutputStream();
String requestContent = "{}";
if (!CollectionUtils.isEmpty(params)) {
requestContent = JSON.toJSONString(params);
}
outputStream.write(requestContent.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
//xxx: 与socket的流信道不同,关闭信道,不会导致socket断开
outputStream.close();
InputStream inputStream = connection.getInputStream();
byte[] buf = new byte[1024];
int length = 0;
StringBuilder sb = new StringBuilder();
while ((length = inputStream.read(buf)) != -1) {
String temp = new String(buf, StandardCharsets.UTF_8);
sb.append(temp);
}
result = sb.toString();
inputStream.close();
connection.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
;
public static boolean downloadFile(String url, String location) {
final AtomicBoolean flag = new AtomicBoolean(false);
Assert.isTrue(!StringUtils.isEmpty(url), "the parameter url cannot be null!");
Assert.isTrue(!StringUtils.isEmpty(location), "the parameter location cannot be null!");
File parent = new File(location);
if (!parent.exists()) {
if (!parent.mkdir()) {
throw new RuntimeException("the location [" + location + "] isn't existed and cannot be created.");
}
}
Thread downLoadThread = new Thread(() -> {
try {
URL realURL = new URL(url);
HttpURLConnection urlConnection = (HttpURLConnection) realURL.openConnection();
InputStream inputStream = urlConnection.getInputStream();
//文件名 可以从响应中获取
String fileName = "testdownload.txt";
FileOutputStream fileOutputStream = new FileOutputStream(location + File.separator + fileName);
byte[] buf = new byte[inputStream.available()];
int len = 0;
while ((len = inputStream.read(buf)) != -1) {
fileOutputStream.write(buf);
}
inputStream.close();
fileOutputStream.close();
urlConnection.disconnect();
flag.set(true);
} catch (Exception e) {
flag.set(false);
e.printStackTrace();
}
});
downLoadThread.start();
//xxx: 同步
try {
downLoadThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
return flag.get();
}
public static String uploadFile(String url,File file){
Assert.isTrue(!StringUtils.isEmpty(url),"the parameter url: ["+url+"] cannot be empty! ");
Assert.isTrue(!ObjectUtils.isEmpty(file),"the parameter file: ["+file+"] cannot be empty! ");
String result = "";
String boundary = System.currentTimeMillis()+"";
Map<String,String> params = new HashMap<>();
params.put("token","theToken");
try {
URL realUrl = new URL(url);
HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection();
connection.setRequestMethod("POST");
connection.addRequestProperty("Connection","Keep-Alive");
connection.addRequestProperty("Charset","UTF-8");
connection.addRequestProperty("Content-Type","multipart/form-data;boundary="+boundary);
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setConnectTimeout(20000);
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
FileInputStream fileInputStream = new FileInputStream(file);
dataOutputStream.writeBytes("--"+boundary+"\r\n");
dataOutputStream.writeBytes("Content-Disposition: form-data;name=\"file\"; filename=\""+ URLEncoder.encode(file.getName(),StandardCharsets.UTF_8.toString())+"\"\r\n");
dataOutputStream.writeBytes("\r\n");
byte[] buf = new byte[1024];
while ((fileInputStream.read(buf))!=-1){
dataOutputStream.write(buf);
}
dataOutputStream.writeBytes("\r\n");
dataOutputStream.writeBytes("--"+boundary+"\r\n");
//xxx: 写常规参数
try {
Set<String > keySet=params.keySet();
for (String param:keySet){
dataOutputStream.writeBytes("Content-Disposition: form-data; name=\""
+URLEncoder.encode(param,StandardCharsets.UTF_8.toString())+"\"\r\n");
dataOutputStream.writeBytes("\r\n");
String value=params.get(param);
dataOutputStream.writeBytes(URLEncoder.encode(value,StandardCharsets.UTF_8.toString())+"\r\n");
dataOutputStream.writeBytes("--"+boundary+"\r\n");
}
}catch (Exception e){
}
InputStream inputStream=connection.getInputStream();
byte[] data=new byte[1024];
StringBuffer sb1=new StringBuffer();
int length=0;
while ((length=inputStream.read(data))!=-1){
String s=new String(data, StandardCharsets.UTF_8);
sb1.append(s);
}
result=sb1.toString();
inputStream.close();
fileInputStream.close();
dataOutputStream.close();
connection.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
}
- 源码
UrlConnection的实现机制核心在于:
NetworkClient
package sun.net;
public class NetworkClient {
/*xxx: socket实例*/
protected Socket serverSocket;
/*xxx: io出端*/
public PrintStream serverOutput;
/*xxx: io入端*/
public InputStream serverInput;
/*xxx: 连接socket */
protected Socket doConnect(String host, int port) throws IOException, UnknownHostException {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(host, port), this.connectTimeout);
socket.setSoTimeout(this.readTimeout);
return socket;
}
public void openServer(String host, int port) throws IOException, UnknownHostException {
this.serverSocket = this.doConnect(host, port);
this.serverOutput = new PrintStream(new BufferedOutputStream(this.serverSocket.getOutputStream()), true, encoding);
this.serverInput = new BufferedInputStream(this.serverSocket.getInputStream());
}
}
public class HttpClient extends NetworkClient {
protected HttpClient(URL url, Proxy proxy, int connectionTimeout) throws IOException {
this.openServer();
}
}
更多内容可参考文件-URL获取文件
# 开源apache-http-client
- 实践
package com.automannn.demo.httpClient.httpClient;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.FileEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author automannn
* @Date 2022/4/6
*/
public class ApacheHttpClientUtil {
public static String get(String url){
String result ="";
Assert.isTrue(!StringUtils.isEmpty(url),"the parameter url: "+ url+" cannot be null.");
CloseableHttpClient httpClient = HttpClients.createDefault();
HttpGet request = new HttpGet(url);
request.addHeader("k1","v1");
try {
CloseableHttpResponse response = httpClient.execute(request);
HttpEntity entity = response.getEntity();
if (entity!=null){
result= EntityUtils.toString(entity);
System.out.println(result);
}
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
public static String postForm(String url, Map<String,String> params){
Assert.isTrue(!StringUtils.isEmpty(url),"the parameter url: "+ url+" cannot be null.");
Assert.isTrue(!ObjectUtils.isEmpty(params),"the parameter params: "+ params+" cannot be null.");
String result = "";
HttpPost request = new HttpPost(url);
List<NameValuePair> urlParameters = new ArrayList<>();
for (Map.Entry<String,String> en: params.entrySet()){
urlParameters.add(new BasicNameValuePair(en.getKey(),en.getValue()));
}
try {
request.setEntity(new UrlEncodedFormEntity(urlParameters));
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(request);
result= EntityUtils.toString(response.getEntity());
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
public static String postJson(String url, Map<String,String> params){
Assert.isTrue(!StringUtils.isEmpty(url),"the parameter url: "+ url+" cannot be null.");
Assert.isTrue(!ObjectUtils.isEmpty(params),"the parameter params: "+ params+" cannot be null.");
String result = "";
HttpPost request = new HttpPost(url);
request.addHeader("content-type","application/json");
try {
request.setEntity(new StringEntity(JSON.toJSONString(params),ContentType.APPLICATION_JSON));
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(request);
result= EntityUtils.toString(response.getEntity());
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
public static boolean uploadFile(String url, File file){
Assert.isTrue(!StringUtils.isEmpty(url),"the parameter url cannot be null.");
Assert.isTrue(!ObjectUtils.isEmpty(file),"the parameter file cannot be null.");
boolean flag = false;
HttpPost request = new HttpPost(url);
request.setEntity(new FileEntity(file));
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
CloseableHttpResponse response = httpClient.execute(request);
if (response.getStatusLine().getStatusCode()==200){
flag=true;
}
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}
}
- 源码
/*xxx: httpClient工厂*/
public class HttpClients {
/*xxx: 创建默认的httpClient*/
public static CloseableHttpClient createDefault() {
return HttpClientBuilder.create().build();
}
}
/*xxx: httpClient构造器 */
public class HttpClientBuilder {
/*xxx: 请求执行器 */
private HttpRequestExecutor requestExec;
/*xxx: 连接管理器 */
private HttpClientConnectionManager connManager;
/*xxx: 会话保持策略*/
private ConnectionKeepAliveStrategy keepAliveStrategy;
/*xxx: cookie存储器*/
private CookieStore cookieStore;
public static HttpClientBuilder create() {
return new HttpClientBuilder();
}
public CloseableHttpClient build() {
//xxx: 根据配置项进行构造,略
}
}
/*xxx: http请求执行器 */
public class HttpRequestExecutor {
/*xxx: 执行http请求 */
public HttpResponse execute(
final HttpRequest request,
final HttpClientConnection conn,
final HttpContext context) throws IOException, HttpException {
HttpResponse response = doSendRequest(request, conn, context);
}
/*xxx: 发送请求*/
protected HttpResponse doSendRequest(
final HttpRequest request,
final HttpClientConnection conn,
final HttpContext context) throws IOException, HttpException {
conn.sendRequestEntity((HttpEntityEnclosingRequest) request);
}
}
/*xxx: http连接 */
public interface HttpConnection extends Closeable {
@Override
/*xxx: 释放资源*/
void close() throws IOException;
/*xxx: 连接是否开启*/
boolean isOpen();
/*xxx: 设置超时时间 */
void setSocketTimeout(int timeout);
/*xxx: 关闭连接*/
void shutdown() throws IOException;
}
/*xxx: http客户端连接抽象 */
public interface HttpClientConnection extends HttpConnection {
/*xxx: 响应是否可达,检查连接活跃性 */
boolean isResponseAvailable(int timeout)
throws IOException;
/*xxx: 发送请求头 */
void sendRequestHeader(HttpRequest request)
throws HttpException, IOException;
/*xxx: 发送请求体*/
void sendRequestEntity(HttpEntityEnclosingRequest request)
throws HttpException, IOException;
/*xxx: 接收响应头*/
HttpResponse receiveResponseHeader()
throws HttpException, IOException;
/*xxx: 接收响应体*/
void receiveResponseEntity(HttpResponse response)
throws HttpException, IOException;
}
- 连接的建立流程
/*xxx: http客户端连接管理器 */
public interface HttpClientConnectionManager {
/*xxx: 连接服务端*/
void connect(
HttpClientConnection conn,
HttpRoute route,
int connectTimeout,
HttpContext context) throws IOException;
}
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public class PoolingHttpClientConnectionManager
implements HttpClientConnectionManager, ConnPoolControl<HttpRoute>, Closeable {
/*xxx: 与服务端建立连接 */
@Override
public void connect(
final HttpClientConnection managedConn,
final HttpRoute route,
final int connectTimeout,
final HttpContext context) throws IOException {
//xxx: 建立连接
this.connectionOperator.connect(
conn, host, route.getLocalSocketAddress(), connectTimeout, resolveSocketConfig(host), context);
}
}
/*xxx: http连接操作*/
public interface HttpClientConnectionOperator {
/*xxx: 连接*/
void connect(
ManagedHttpClientConnection conn,
HttpHost host,
InetSocketAddress localAddress,
int connectTimeout,
SocketConfig socketConfig,
HttpContext context) throws IOException;
/*xxx: 协议升级*/
void upgrade(
ManagedHttpClientConnection conn,
HttpHost host,
HttpContext context) throws IOException;
}
@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
public class DefaultHttpClientConnectionOperator implements HttpClientConnectionOperator {
@Override
public void connect(
final ManagedHttpClientConnection conn,
final HttpHost host,
final InetSocketAddress localAddress,
final int connectTimeout,
final SocketConfig socketConfig,
final HttpContext context) throws IOException {
Socket sock = sf.connectSocket(
connectTimeout, sock, host, remoteAddress, localAddress, context);
}
}
/*xxx: socket连接工厂 */
public interface ConnectionSocketFactory {
/*xxx: 连接远程socket*/
Socket connectSocket(
int connectTimeout,
Socket sock,
HttpHost host,
InetSocketAddress remoteAddress,
InetSocketAddress localAddress,
HttpContext context) throws IOException;
}
@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class PlainConnectionSocketFactory implements ConnectionSocketFactory {
@Override
public Socket connectSocket(
final int connectTimeout,
final Socket socket,
final HttpHost host,
final InetSocketAddress remoteAddress,
final InetSocketAddress localAddress,
final HttpContext context) throws IOException {
final Socket sock = socket != null ? socket : createSocket(context);
sock.connect(remoteAddress, connectTimeout);
return sock;
}
}
# 开源HttpTemplate
- 案例
package com.automannn.demo.httpClient.restTemplate;
import com.alibaba.fastjson.JSON;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.HttpOutputMessage;
import org.springframework.http.MediaType;
import org.springframework.http.converter.FormHttpMessageConverter;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.http.converter.HttpMessageNotWritableException;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author automannn
* @Date 2022/4/6
*/
public class RestTemplateUtil {
public static String get(String url) {
Assert.isTrue(!StringUtils.isEmpty(url), "the parameter url: " + url + " cannot be null.");
RestTemplate restTemplate = new RestTemplate();
String result = restTemplate.getForObject(url, String.class);
return result;
}
public static String postForm(String url, Map<String, String> params) {
Assert.isTrue(!StringUtils.isEmpty(url), "the parameter url: " + url + " cannot be null.");
Assert.isTrue(!ObjectUtils.isEmpty(params), "the parameter params: " + params + " cannot be null.");
RestTemplate restTemplate = new RestTemplate();
restTemplate.getMessageConverters().add(new HttpMessageConverter<HashMap<String,String>>() {
@Override
public boolean canRead(Class<?> aClass, MediaType mediaType) {
return false;
}
@Override
public boolean canWrite(Class<?> aClass, MediaType mediaType) {
return false;
}
@Override
public List<MediaType> getSupportedMediaTypes() {
return null;
}
@Override
public HashMap<String, String> read(Class<? extends HashMap<String, String>> aClass, HttpInputMessage httpInputMessage) throws IOException, HttpMessageNotReadableException {
return null;
}
@Override
public void write(HashMap<String, String> stringStringHashMap, MediaType mediaType, HttpOutputMessage httpOutputMessage) throws IOException, HttpMessageNotWritableException {
}
});
String result = restTemplate.postForObject(url, params, String.class);
return result;
}
public static String postJson(String url, Map<String, String> params) {
Assert.isTrue(!StringUtils.isEmpty(url), "the parameter url: " + url + " cannot be null.");
Assert.isTrue(!ObjectUtils.isEmpty(params), "the parameter params: " + params + " cannot be null.");
RestTemplate restTemplate = new RestTemplate();
String result = restTemplate.postForObject(url, JSON.toJSONString(params), String.class);
return result;
}
}
- 源码
/*xxx: rest操作*/
public interface RestOperations {
@Nullable
/*xxx: get获取指定类型的对象*/
<T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException;
/*xxx: get获取指定响应实体 */
<T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Object... uriVariables)
throws RestClientException;
/*xxx: head获取响应头*/
HttpHeaders headForHeaders(String url, Object... uriVariables) throws RestClientException;
@Nullable
/*xxx: post获取指定类型的对象 */
<T> T postForObject(String url, @Nullable Object request, Class<T> responseType,
Object... uriVariables) throws RestClientException;
/*xxx: post获取指定类型的响应实体 */
<T> ResponseEntity<T> postForEntity(String url, @Nullable Object request, Class<T> responseType,
Object... uriVariables) throws RestClientException;
/*xxx: put上传数据 */
void put(String url, @Nullable Object request, Object... uriVariables) throws RestClientException;
/*xxx: delete方法移除对象 */
void delete(String url, Object... uriVariables) throws RestClientException;
@Nullable
/*xxx: 直接执行,方法通过参数进行传递*/
<T> T execute(String url, HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor, Object... uriVariables)
throws RestClientException;
}
/*xxx: http访问器 */
public abstract class HttpAccessor {
/*xxx: httpClient请求初始化器 */
private final List<ClientHttpRequestInitializer> clientHttpRequestInitializers = new ArrayList<>();
/*xxx: 客户端请求工厂,代表底层网络实现方式,默认情况下,用的 原生的 HttpUrlConnection实现 */
private ClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
/*xxx: 创建客户端请求 */
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
initialize(request);
}
private void initialize(ClientHttpRequest request) {
this.clientHttpRequestInitializers.forEach(initializer -> initializer.initialize(request));
}
}
/*xxx: http请求拦截器 */
public abstract class InterceptingHttpAccessor extends HttpAccessor {
/*xxx: 请求拦截器列表 */
private final List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>();
@Override
/*xxx: 客户端请求工厂 */
public ClientHttpRequestFactory getRequestFactory() {
List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
/*xxx: 将拦截器,设置到 客户端请求工厂 */
factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
}
}
/*xxx: rest模板*/
public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {
/*xxx: 消息转换器列表 */
private final List<HttpMessageConverter<?>> messageConverters = new ArrayList<>();
/*xxx: 异常处理器 */
private ResponseErrorHandler errorHandler = new DefaultResponseErrorHandler();
/*xxx: http请求头部提取器 */
private final ResponseExtractor<HttpHeaders> headersExtractor = new HeadersExtractor();
/*xxx: 使用该模板时,可以指定底层网络实现方式 */
public RestTemplate(ClientHttpRequestFactory requestFactory) {
this();
setRequestFactory(requestFactory);
}
@Override
public <T> ResponseEntity<T> postForEntity(String url, @Nullable Object request,
Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException {
/*xxx: 请求回调 */
RequestCallback requestCallback = httpEntityCallback(request, responseType);
/*xxx: 响应提取器*/
ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
return nonNull(execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables));
}
@Nullable
/*xxx: 执行请求 */
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
/*xxx: 请求回调处理器,设置请求头部数据格式 */
requestCallback.doWithRequest(request);
/*xxx: 执行请求,获取到响应*/
response = request.execute();
/*xxx: 处理响应,如果有异常,则进行处理 */
handleResponse(url, method, response);
/*xxx: 通过响应提取器,提取结果 */
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
}
/*xxx: 客户端请求 */
public interface ClientHttpRequest extends HttpRequest, HttpOutputMessage {
/*xxx: 执行请求 */
ClientHttpResponse execute() throws IOException;
}
/*xxx: 抽象http客户端 */
public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
@Override
public final ClientHttpResponse execute() throws IOException {
assertNotExecuted();
/*xxx: 执行请求 */
ClientHttpResponse result = executeInternal(this.headers);
this.executed = true;
return result;
}
/*xxx: 执行请求 */
protected abstract ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException;
}
/*xxx: 默认的客户端实现 */
final class SimpleStreamingClientHttpRequest extends AbstractClientHttpRequest {
private final HttpURLConnection connection;
@Override
protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
this.connection.connect();
this.connection.getResponseCode();
return new SimpleClientHttpResponse(this.connection);
}
}
@Deprecated
/*xxx: netty4实现的异步客户端,由 工厂类 Netty4ClientHttpRequestFactory实现*/
/*xxx: spring5由 ReactorClientHttpConnector实现 响应式开发,该功能废弃 */
class Netty4ClientHttpRequest extends AbstractAsyncClientHttpRequest implements ClientHttpRequest {
@Override
public ClientHttpResponse execute() throws IOException {
return executeAsync().get();
}
@Override
/*xxx: 执行请求 */
protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers) throws IOException {
this.bootstrap.connect(this.uri.getHost(), getPort(this.uri)).addListener(connectionListener);
}
}
此外,客户端还可由 okhttp3
,apache-httpclient
提供