EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) 创建boss线程组,负责数据读写
EventLoopGroup workerGroup = new NioEventLoopGroup(); // (2) 创建worker线程组,服务类
try {
ServerBootstrap bootstrap = new ServerBootstrap();
// (3) 设置使用ReflectiveChannelFactory通过反射创建NioServerSocketChannel对象
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// (4) 初始化childHandler,添加DiscardServerHandler处理器
.childHandler(new ChannelInitializer
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
});
// (5) 设置完成三次握手后等待队列的大小,超过了则拒绝,默认50
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
// (6) 设置childOption,启用SO_KEEPALIVE选项以保持TCP连接活跃状态
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// (7) 绑定并启动接收传入的连接请求
ChannelFuture f = bootstrap.bind(5555).sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// (8) 当关闭时,释放boss和worker线程组资源
if (f != null && f.isSuccess()) {
f.channel().closeFuture().sync();
}
}
AbstractBootstrap
接下来进入dobind方法,该方法首先创建一个Channel对象,然后调用initAndRegister()方法进行初始化和注册。如果注册过程中出现异常,直接返回regFuture。如果注册成功,创建一个新的ChannelPromise对象,并调用doBind0()方法完成绑定操作。最后返回promise。
代码如下:
```java
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建 Channel 对象
final Channel channel = initAndRegister();
// 如果注册过程中出现异常,直接返回 regFuture
if (channel.registry().isRegistered(this)) {
return new DefaultChannelPromise(channel, this);
}
// 在这一点上,我们知道注册是完整和成功的
final ChannelPromise promise = channel.newPromise();
// 调用 doBind0() 方法完成绑定操作
regFuture = doBind0(regFuture, channel, localAddress, promise);
// 返回 promise
return promise;
}
```
这段代码主要完成了NioServerSocketChannel的初始化和注册。以下是重构后的代码:
```java
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建之前设置的NioServerSocketChannel对象
channel = channelFactory.newChannel();
// 设置channel属性并初始化pipeline,新建ServerBootstrapAcceptor
ServerBootstrapAcceptor serverBootstrapAcceptor = new ServerBootstrapAcceptor();
serverBootstrapAcceptor.init(channel);
// 客户端ChannelInitializer初始化
ChannelInitializer clientChannelInitializer = new ClientChannelInitializer();
clientChannelInitializer.initChannel(channel);
} catch (Throwable t) {
// 注册失败,我们注册到全局线程去执行
}
// ServerBootstrapConfig里面放了ServerBootstrap,获得线程池最后到SingleThreadEventLoop里面把线程池注册给channel
// 我们创建的channel的pipeline也在这里加入
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
```
重构后的方法首先创建了NioServerSocketChannel对象,并设置其属性。然后初始化一个ServerBootstrapAcceptor对象,并调用其init方法。接着创建一个ClientChannelInitializer对象,并调用其initChannel方法进行客户端的初始化。最后,将channel注册到全局线程中,并返回注册结果。
```java
void init(Channel channel) throws Exception {
final Map
synchronized (options) {
// 将之前填的属性设置到 channel 中
setChannelOptions(channel, options, logger);
}
ChannelPipeline p = channel.pipeline();
// 在 head 与 tail 之间添加 ChannelInitializer,后续会初始化这个 ChannelInitializer
p.addLast(new ChannelInitializer
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
```
在执行完AbstractBootstrap的initAndRegister方法后,我们进入到ServerBootstrapConfig类中,在这里我们创建了一个ServerBootstrap对象,并通过SingleThreadEventLoop将线程池注册到channel中。同时,我们也在此处创建了channel的pipeline。
具体过程如下:
1. 创建ServerBootstrapConfig对象并添加ServerBootstrap。
2. 通过config().group()获取MultithreadEventLoopGroup对象,然后调用register(channel)方法将线程池注册到channel中。
3. 在MultithreadEventLoopGroup类中,实现register(Channel channel)方法,该方法首先调用next().register(channel)将线程池注册到下一个EventLoop中。
4. 最后,在AbstractChannel类中实现register(EventLoop eventLoop, final ChannelPromise promise)方法,该方法将线程池注册到指定的EventLoop中,并设置promise完成信号。如果当前线程已经在eventLoop中运行,则直接调用register0(promise)方法;否则,提交并启动线程池以运行register0(promise)方法。
代码重构后如下:
```java
public void initAndRegister() {
ServerBootstrapConfig config = new ServerBootstrapConfig();
config.setServerBootstrap(new ServerBootstrap());
ChannelFuture regFuture = config.getMultithreadEventLoopGroup().register(config.getChannel());
regFuture.addListener(f -> {
if (f.isSuccess()) {
System.out.println("线程池已成功注册");
} else {
System.out.println("线程池注册失败");
}
});
}
```
private void register0(ChannelPromise promise) {
try {
// 调用doRegister()方法,将AbstractNioChannel的方法注册到select中
doRegister();
// 调用回调函数pipeline.invokeHandlerAddedIfNeeded()
pipeline.invokeHandlerAddedIfNeeded();
// 设置promise完成,避免阻塞
safeSetSuccess(promise);
// 将ChannelInitializer中的元素添加到pipeline中
pipeline.fireChannelRegistered();
// 如果通道从未注册过,则只启用通道激活。如果通道被撤销注册并重新注册,这将防止触发多个通道活动。
if (isActive()) {
boolean firstRegistration = true; // 用于判断是否是首次注册
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 如果是非首次注册且配置了自动读操作,则进行相关操作
beginRead();
}
}
} catch (Throwable t) {
// 异常处理逻辑
}
}
// 在此处补充注释缺失的代码:
if (!isActive()) {
beginRead();
} else {
safeSetSuccess(promise);
}
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将channel注册到selector中,但为什么返回值是0呢?
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
// 初始化刚才添加的管道
initlizePipeline();
// 在网管道里面加上一个ServerBootstrapAcceptor
// 注意:管道分为inbound和outbound,读取和输出的时候要搞清楚
//将ChannelInitializer的东西添加到pipeline
// pipeline.fireChannelRegistered();
// 设置读、感兴趣的东西(accept等)并开始读取
beginRead();
break;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
} catch (Exception e) {
throw new RuntimeException("Error occurred during register", e);
}
}
}
// 在doRegister方法中,返回上一个register0方法的位置。
return register0();
在服务端启动时,会先调用channelRegistered()方法。在这个方法中,用户处理程序可以设置管道。然后,会触发doBind0()方法,该方法在channelregister()被触发之前被调用。这个方法给用户处理程序一个机会,在它的通道注册()实现中设置管道。接下来,获取NioEventLoop里面的线程池,并执行run()方法。在run()方法中,首先判断regFuture是否成功。如果成功,则通过管道将实际的channel注册地址绑定到本地地址上,并添加监听器ChannelFutureListener.CLOSE_ON_FAILURE。如果不成功,则将失败原因设置为promise的故障。这样就完成了服务端的启动。