EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) 创建boss线程组,负责数据读写

EventLoopGroup workerGroup = new NioEventLoopGroup(); // (2) 创建worker线程组,服务类

try {

ServerBootstrap bootstrap = new ServerBootstrap();

// (3) 设置使用ReflectiveChannelFactory通过反射创建NioServerSocketChannel对象

bootstrap.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

// (4) 初始化childHandler,添加DiscardServerHandler处理器

.childHandler(new ChannelInitializer() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new DiscardServerHandler());

}

});

// (5) 设置完成三次握手后等待队列的大小,超过了则拒绝,默认50

bootstrap.option(ChannelOption.SO_BACKLOG, 128);

// (6) 设置childOption,启用SO_KEEPALIVE选项以保持TCP连接活跃状态

bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

// (7) 绑定并启动接收传入的连接请求

ChannelFuture f = bootstrap.bind(5555).sync();

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

// (8) 当关闭时,释放boss和worker线程组资源

if (f != null && f.isSuccess()) {

f.channel().closeFuture().sync();

}

}

AbstractBootstrap

接下来进入dobind方法,该方法首先创建一个Channel对象,然后调用initAndRegister()方法进行初始化和注册。如果注册过程中出现异常,直接返回regFuture。如果注册成功,创建一个新的ChannelPromise对象,并调用doBind0()方法完成绑定操作。最后返回promise。

代码如下:

```java

private ChannelFuture doBind(final SocketAddress localAddress) {

// 创建 Channel 对象

final Channel channel = initAndRegister();

// 如果注册过程中出现异常,直接返回 regFuture

if (channel.registry().isRegistered(this)) {

return new DefaultChannelPromise(channel, this);

}

// 在这一点上,我们知道注册是完整和成功的

final ChannelPromise promise = channel.newPromise();

// 调用 doBind0() 方法完成绑定操作

regFuture = doBind0(regFuture, channel, localAddress, promise);

// 返回 promise

return promise;

}

```

这段代码主要完成了NioServerSocketChannel的初始化和注册。以下是重构后的代码:

```java

final ChannelFuture initAndRegister() {

Channel channel = null;

try {

// 创建之前设置的NioServerSocketChannel对象

channel = channelFactory.newChannel();

// 设置channel属性并初始化pipeline,新建ServerBootstrapAcceptor

ServerBootstrapAcceptor serverBootstrapAcceptor = new ServerBootstrapAcceptor();

serverBootstrapAcceptor.init(channel);

// 客户端ChannelInitializer初始化

ChannelInitializer clientChannelInitializer = new ClientChannelInitializer();

clientChannelInitializer.initChannel(channel);

} catch (Throwable t) {

// 注册失败,我们注册到全局线程去执行

}

// ServerBootstrapConfig里面放了ServerBootstrap,获得线程池最后到SingleThreadEventLoop里面把线程池注册给channel

// 我们创建的channel的pipeline也在这里加入

ChannelFuture regFuture = config().group().register(channel);

if (regFuture.cause() != null) {

if (channel.isRegistered()) {

channel.close();

} else {

channel.unsafe().closeForcibly();

}

}

return regFuture;

}

```

重构后的方法首先创建了NioServerSocketChannel对象,并设置其属性。然后初始化一个ServerBootstrapAcceptor对象,并调用其init方法。接着创建一个ClientChannelInitializer对象,并调用其initChannel方法进行客户端的初始化。最后,将channel注册到全局线程中,并返回注册结果。

```java

void init(Channel channel) throws Exception {

final Map, Object> options = options0();

synchronized (options) {

// 将之前填的属性设置到 channel 中

setChannelOptions(channel, options, logger);

}

ChannelPipeline p = channel.pipeline();

// 在 head 与 tail 之间添加 ChannelInitializer,后续会初始化这个 ChannelInitializer

p.addLast(new ChannelInitializer() {

@Override

public void initChannel(final Channel ch) throws Exception {

final ChannelPipeline pipeline = ch.pipeline();

ChannelHandler handler = config.handler();

if (handler != null) {

pipeline.addLast(handler);

}

ch.eventLoop().execute(new Runnable() {

@Override

public void run() {

pipeline.addLast(new ServerBootstrapAcceptor(

ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

}

});

}

});

}

```

在执行完AbstractBootstrap的initAndRegister方法后,我们进入到ServerBootstrapConfig类中,在这里我们创建了一个ServerBootstrap对象,并通过SingleThreadEventLoop将线程池注册到channel中。同时,我们也在此处创建了channel的pipeline。

具体过程如下:

1. 创建ServerBootstrapConfig对象并添加ServerBootstrap。

2. 通过config().group()获取MultithreadEventLoopGroup对象,然后调用register(channel)方法将线程池注册到channel中。

3. 在MultithreadEventLoopGroup类中,实现register(Channel channel)方法,该方法首先调用next().register(channel)将线程池注册到下一个EventLoop中。

4. 最后,在AbstractChannel类中实现register(EventLoop eventLoop, final ChannelPromise promise)方法,该方法将线程池注册到指定的EventLoop中,并设置promise完成信号。如果当前线程已经在eventLoop中运行,则直接调用register0(promise)方法;否则,提交并启动线程池以运行register0(promise)方法。

代码重构后如下:

```java

public void initAndRegister() {

ServerBootstrapConfig config = new ServerBootstrapConfig();

config.setServerBootstrap(new ServerBootstrap());

ChannelFuture regFuture = config.getMultithreadEventLoopGroup().register(config.getChannel());

regFuture.addListener(f -> {

if (f.isSuccess()) {

System.out.println("线程池已成功注册");

} else {

System.out.println("线程池注册失败");

}

});

}

```

private void register0(ChannelPromise promise) {

try {

// 调用doRegister()方法,将AbstractNioChannel的方法注册到select中

doRegister();

// 调用回调函数pipeline.invokeHandlerAddedIfNeeded()

pipeline.invokeHandlerAddedIfNeeded();

// 设置promise完成,避免阻塞

safeSetSuccess(promise);

// 将ChannelInitializer中的元素添加到pipeline中

pipeline.fireChannelRegistered();

// 如果通道从未注册过,则只启用通道激活。如果通道被撤销注册并重新注册,这将防止触发多个通道活动。

if (isActive()) {

boolean firstRegistration = true; // 用于判断是否是首次注册

if (firstRegistration) {

pipeline.fireChannelActive();

} else if (config().isAutoRead()) {

// 如果是非首次注册且配置了自动读操作,则进行相关操作

beginRead();

}

}

} catch (Throwable t) {

// 异常处理逻辑

}

}

// 在此处补充注释缺失的代码:

if (!isActive()) {

beginRead();

} else {

safeSetSuccess(promise);

}

protected void doRegister() throws Exception {

boolean selected = false;

for (;;) {

try {

// 将channel注册到selector中,但为什么返回值是0呢?

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

// 初始化刚才添加的管道

initlizePipeline();

// 在网管道里面加上一个ServerBootstrapAcceptor

// 注意:管道分为inbound和outbound,读取和输出的时候要搞清楚

//将ChannelInitializer的东西添加到pipeline

// pipeline.fireChannelRegistered();

// 设置读、感兴趣的东西(accept等)并开始读取

beginRead();

break;

} catch (CancelledKeyException e) {

if (!selected) {

eventLoop().selectNow();

selected = true;

} else {

throw e;

}

} catch (Exception e) {

throw new RuntimeException("Error occurred during register", e);

}

}

}

// 在doRegister方法中,返回上一个register0方法的位置。

return register0();

在服务端启动时,会先调用channelRegistered()方法。在这个方法中,用户处理程序可以设置管道。然后,会触发doBind0()方法,该方法在channelregister()被触发之前被调用。这个方法给用户处理程序一个机会,在它的通道注册()实现中设置管道。接下来,获取NioEventLoop里面的线程池,并执行run()方法。在run()方法中,首先判断regFuture是否成功。如果成功,则通过管道将实际的channel注册地址绑定到本地地址上,并添加监听器ChannelFutureListener.CLOSE_ON_FAILURE。如果不成功,则将失败原因设置为promise的故障。这样就完成了服务端的启动。