注:仅仅是尝试看源码,锻炼一下看源码能力,理解并不一定正确。

Netty的服务端创建

创建代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 接收新连接线程,创建新的连接
NioEventLoopGroup boss = new NioEventLoopGroup();
// 对应读取数据的线程,用于处理数据读取以及业务逻辑处理
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap
.group(boss, worker)
// 这里是接收一个Channel的类型,然后通过反射创建一个工厂类
.channel(NioServerSocketChannel.class)
// 这个NioSocketChannel是Netty设计的
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringDecoder());
nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
});
}
}).bind(8000);
}

group(boss, worker)

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

private volatile EventLoopGroup childGroup;

// 省略其他方法

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
} else {
this.childGroup = (EventLoopGroup)ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
}
}

可以看出,将boss也就是parentGroup传递给了父类的group方法,即AbstractBootstrap,而将worker传递给了ServerBootstrap ,并将ServerBootstrap返回。

父类的group方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
volatile EventLoopGroup group;

// 省略其他方法

public B group(EventLoopGroup group) {
ObjectUtil.checkNotNull(group, "group");
if (this.group != null) {
throw new IllegalStateException("group set already");
} else {
this.group = group;
return this.self();
}
}
}

针对于这个group方法,做的事情比较简单,将两个NioEventLoopGroup传递给ServerBootstrap,然后将赋值后的ServerBootstrap返回。

channel(NioServerSocketChannel.class)

1
2
3
public B channel(Class<? extends C> channelClass) {
return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory((Class)ObjectUtil.checkNotNull(channelClass, "channelClass"))));
}

这段代码涉及到一个工厂类的创建new ReflectiveChannelFactory(),该工厂类的创建需要接收传入的channelClass

工厂类的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;

public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");

try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException var3) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", var3);
}
}
}

其实就是通过反射,获取了传入类的构造方法。

另一个方法就是this.channelFactory(),源码如下:

1
2
3
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return this.channelFactory((ChannelFactory)channelFactory);
}
1
2
3
4
5
6
7
8
9
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
} else {
this.channelFactory = channelFactory;
return this.self();
}
}

这里做的事情就是将上一步创建的ReflectiveChannelFactory赋值给AbstractBootstrapChannelFactory

因为在创建ReflectiveChannelFactory时,获取到了传入class的构造方法,就可以创建对应的对象。

childHandler(new ChannelInitializer(){}

1
2
3
4
5
6
7
8
9
10
11
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private volatile ChannelHandler childHandler;

// 省略其他方法

public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = (ChannelHandler)ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}

}

这里就是一个成员属性的赋值,主要看创建ChannelHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
// 这是一个抽象方法,必须实现
protected abstract void initChannel(C var1) throws Exception;
}
// 案例中重写的代码如下:
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringDecoder());
nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
// 重写的这个方法就是我们要做的业务逻辑处理
@Override
// 这个方法,客户端传递数据时应该会调用
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
});
}
1
2
3
4
5
6
// 案例中方法重写的代码如下
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
// 省略其他代码
protected abstract void channelRead0(ChannelHandlerContext var1, I var2) throws Exception;

}

Netty客户端创建

创建代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();

bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new StringEncoder());
}
});

Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();

while (true) {
channel.writeAndFlush(new Date() + ": hello world");
Thread.sleep(3000);
}
}

这里就存在了第一个区别 ServerBootstrapBootstrap

Bootstrap并没有自己的group方法,它是直接调用了父类的group,也就是说传入的group直接赋值给了AbstractBootstrapEventLoopGroup

其他的代码与服务端差不多,主要看writeAndFlushconnect

writeAndFlush

1
2
3
4
5
6
7
8
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
// 默认的管道
private final DefaultChannelPipeline pipeline;

public ChannelFuture writeAndFlush(Object msg) {
return this.pipeline.writeAndFlush(msg);
}
}
1
2
3
4
5
6
7
8
public class DefaultChannelPipeline implements ChannelPipeline {

final AbstractChannelHandlerContext tail;

public final ChannelFuture writeAndFlush(Object msg) {
return this.tail.writeAndFlush(msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

final EventExecutor executor;

private final DefaultChannelPipeline pipeline;

public ChannelFuture writeAndFlush(Object msg) {
return this.writeAndFlush(msg, this.newPromise());
}

public ChannelPromise newPromise() {
return new DefaultChannelPromise(this.channel(), this.executor());
}

public Channel channel() {
return this.pipeline.channel();
}

public EventExecutor executor() {
return (EventExecutor)(this.executor == null ? this.channel().eventLoop() : this.executor);
}

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
this.write(msg, true, promise);
return promise;
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");

try {
// 这里,如果promise不可用,那么就释放消息
if (this.isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
return;
}
} catch (RuntimeException var8) {
ReferenceCountUtil.release(msg);
throw var8;
}

AbstractChannelHandlerContext next = this.findContextOutbound(flush ? 98304 : '耀');
Object m = this.pipeline.touch(msg, next);
EventExecutor executor = next.executor();
// 这里是判断当前的任务是否在循环当中
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractChannelHandlerContext.WriteTask task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}

}
}