产生原因
- 应用程序write写入的字节大小小于套接口发送缓冲区的大小
- 进行MSS大小的TCP分段、以太网帧的payload大于MTU进行IP分片等
业界协议的三种方案
- 消息定长,例如每个报文的大小固定未200个字节,如果不够,空位补空格
- 在包尾部增加特殊字符进行分割,例如加回车
- 消息分为消息头和消息体,在消息头中包含消息总长度的字段,然后进行业务处理
Netty 处理方式
消息定长
public static void main(String[] args) throws InterruptedException {
//1. 创建两个线程组: 一个用于进行网络连接接受的 另一个用于我们的实际处理(网络通信的读写)
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
//2. 通过辅助类去构造server/client
ServerBootstrap b = new ServerBootstrap();
//3. 进行Nio Server的基础配置
//3.1 绑定两个线程组
b.group(bossGroup, workGroup)
//3.2 因为是server端,所以需要配置NioServerSocketChannel
.channel(NioServerSocketChannel.class)
//3.3 设置链接超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
//3.4 设置TCP backlog参数 = sync队列 + accept队列
.option(ChannelOption.SO_BACKLOG, 1024)
//3.5 设置配置项 通信不延迟
.childOption(ChannelOption.TCP_NODELAY, true)
//3.6 设置配置项 接收与发送缓存区大小
.childOption(ChannelOption.SO_RCVBUF, 1024 * 32)
.childOption(ChannelOption.SO_SNDBUF, 1024 * 32)
//3.7 进行初始化 ChannelInitializer , 用于构建双向链表 "pipeline" 添加业务handler处理
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 设定长字符串接收
sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
sc.pipeline().addLast(new StringDecoder());
//3.8 这里仅仅只是添加一个业务处理器:ServerHandler(后面我们要针对他进行编码)
sc.pipeline().addLast(new ServerHandler());
}
});
//4. 服务器端绑定端口并启动服务;使用channel级别的监听close端口 阻塞的方式
ChannelFuture cf = b.bind(8765).sync();
cf.channel().closeFuture().sync();
//5. 释放资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
ServerHandler代码
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("server channel active..");
}
/**
* channelRead
* 读写数据核心方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//1. 读取客户端的数据(缓存中去取并打印到控制台)
String requestBody = (String) msg;
System.err.println("Server: " + requestBody);
//2. 返回响应数据
}
/**
* exceptionCaught
* 捕获异常方法
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
控制台展示
server channel active..
Server: 12345
再次点击发送,控制台展示
server channel active..
Server: 12345
Server: 61234
原因
服务器按照定长接收,因为缓冲区不到5个字节,所以没有解析。
特殊字符
public static void main(String[] args) throws InterruptedException {
//1. 创建两个线程组: 一个用于进行网络连接接受的 另一个用于我们的实际处理(网络通信的读写)
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
//2. 通过辅助类去构造server/client
ServerBootstrap b = new ServerBootstrap();
//3. 进行Nio Server的基础配置
//3.1 绑定两个线程组
b.group(bossGroup, workGroup)
//3.2 因为是server端,所以需要配置NioServerSocketChannel
.channel(NioServerSocketChannel.class)
//3.3 设置链接超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
//3.4 设置TCP backlog参数 = sync队列 + accept队列
.option(ChannelOption.SO_BACKLOG, 1024)
//3.5 设置配置项 通信不延迟
.childOption(ChannelOption.TCP_NODELAY, true)
//3.6 设置配置项 接收与发送缓存区大小
.childOption(ChannelOption.SO_RCVBUF, 1024 * 32)
.childOption(ChannelOption.SO_SNDBUF, 1024 * 32)
//3.7 进行初始化 ChannelInitializer , 用于构建双向链表 "pipeline" 添加业务handler处理
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 设置特殊字符分割
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
// 设定长字符串接收
// sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
sc.pipeline().addLast(new StringDecoder());
//3.8 这里仅仅只是添加一个业务处理器:ServerHandler(后面我们要针对他进行编码)
sc.pipeline().addLast(new ServerHandler());
}
});
//4. 服务器端绑定端口并启动服务;使用channel级别的监听close端口 阻塞的方式
ChannelFuture cf = b.bind(8765).sync();
cf.channel().closeFuture().sync();
//5. 释放资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
用数据调试模拟器发送123456
server channel active..
用数据调试模拟器再次发送123456$_,控制台展示
Server: 123456123456
原因
服务器开始没有找到特殊字符,认为当前消息没有接收完毕,所以没有处理,当收到特殊字符$_后,认为当前消息接收完毕,然后才打印消息