注:仅仅是尝试看源码,锻炼一下看源码能力,理解并不一定正确。
Netty的服务端创建 创建代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static void main (String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap (); NioEventLoopGroup boss = new NioEventLoopGroup (); NioEventLoopGroup worker = new NioEventLoopGroup (); serverBootstrap .group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new StringDecoder ()); nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler <String>() { @Override protected void channelRead0 (ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); } }); } }).bind(8000 ); }
group(boss, worker) 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class ServerBootstrap extends AbstractBootstrap <ServerBootstrap, ServerChannel> { private volatile EventLoopGroup childGroup; public ServerBootstrap group (EventLoopGroup parentGroup, EventLoopGroup childGroup) { super .group(parentGroup); if (this .childGroup != null ) { throw new IllegalStateException ("childGroup set already" ); } else { this .childGroup = (EventLoopGroup)ObjectUtil.checkNotNull(childGroup, "childGroup" ); return this ; } } }
可以看出,将boss也就是parentGroup传递给了父类的group
方法,即AbstractBootstrap
,而将worker传递给了ServerBootstrap
,并将ServerBootstrap返回。
父类的group方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public abstract class AbstractBootstrap <B extends AbstractBootstrap <B, C>, C extends Channel > implements Cloneable { volatile EventLoopGroup group; public B group (EventLoopGroup group) { ObjectUtil.checkNotNull(group, "group" ); if (this .group != null ) { throw new IllegalStateException ("group set already" ); } else { this .group = group; return this .self(); } } }
针对于这个group方法,做的事情比较简单,将两个NioEventLoopGroup
传递给ServerBootstrap
,然后将赋值后的ServerBootstrap
返回。
channel(NioServerSocketChannel.class) 1 2 3 public B channel (Class<? extends C> channelClass) { return this .channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory ((Class)ObjectUtil.checkNotNull(channelClass, "channelClass" )))); }
这段代码涉及到一个工厂类的创建new ReflectiveChannelFactory()
,该工厂类的创建需要接收传入的channelClass
工厂类的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 public class ReflectiveChannelFactory <T extends Channel > implements ChannelFactory <T> { private final Constructor<? extends T > constructor; public ReflectiveChannelFactory (Class<? extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz" ); try { this .constructor = clazz.getConstructor(); } catch (NoSuchMethodException var3) { throw new IllegalArgumentException ("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor" , var3); } } }
其实就是通过反射,获取了传入类的构造方法。
另一个方法就是this.channelFactory()
,源码如下:
1 2 3 public B channelFactory (io.netty.channel.ChannelFactory<? extends C> channelFactory) { return this .channelFactory((ChannelFactory)channelFactory); }
1 2 3 4 5 6 7 8 9 public B channelFactory (ChannelFactory<? extends C> channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory" ); if (this .channelFactory != null ) { throw new IllegalStateException ("channelFactory set already" ); } else { this .channelFactory = channelFactory; return this .self(); } }
这里做的事情就是将上一步创建的ReflectiveChannelFactory
赋值给AbstractBootstrap
的ChannelFactory
。
因为在创建ReflectiveChannelFactory
时,获取到了传入class的构造方法,就可以创建对应的对象。
childHandler(new ChannelInitializer(){} 1 2 3 4 5 6 7 8 9 10 11 public class ServerBootstrap extends AbstractBootstrap <ServerBootstrap, ServerChannel> { private volatile ChannelHandler childHandler; public ServerBootstrap childHandler (ChannelHandler childHandler) { this .childHandler = (ChannelHandler)ObjectUtil.checkNotNull(childHandler, "childHandler" ); return this ; } }
这里就是一个成员属性的赋值,主要看创建ChannelHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public abstract class ChannelInitializer <C extends Channel > extends ChannelInboundHandlerAdapter { protected abstract void initChannel (C var1) throws Exception; } @Override protected void initChannel (NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new StringDecoder ()); nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler <String>() { @Override protected void channelRead0 (ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); } }); }
1 2 3 4 5 6 public abstract class SimpleChannelInboundHandler <I> extends ChannelInboundHandlerAdapter { protected abstract void channelRead0 (ChannelHandlerContext var1, I var2) throws Exception; }
Netty客户端创建 创建代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap (); NioEventLoopGroup group = new NioEventLoopGroup (); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <>() { @Override protected void initChannel (Channel channel) throws Exception { channel.pipeline().addLast(new StringEncoder ()); } }); Channel channel = bootstrap.connect("127.0.0.1" , 8000 ).channel(); while (true ) { channel.writeAndFlush(new Date () + ": hello world" ); Thread.sleep(3000 ); } }
这里就存在了第一个区别 ServerBootstrap
和 Bootstrap
Bootstrap
并没有自己的group方法,它是直接调用了父类的group,也就是说传入的group直接赋值给了AbstractBootstrap
的EventLoopGroup
。
其他的代码与服务端差不多,主要看writeAndFlush
和connect
writeAndFlush 1 2 3 4 5 6 7 8 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private final DefaultChannelPipeline pipeline; public ChannelFuture writeAndFlush (Object msg) { return this .pipeline.writeAndFlush(msg); } }
1 2 3 4 5 6 7 8 public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext tail; public final ChannelFuture writeAndFlush (Object msg) { return this .tail.writeAndFlush(msg); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 abstract class AbstractChannelHandlerContext implements ChannelHandlerContext , ResourceLeakHint { final EventExecutor executor; private final DefaultChannelPipeline pipeline; public ChannelFuture writeAndFlush (Object msg) { return this .writeAndFlush(msg, this .newPromise()); } public ChannelPromise newPromise () { return new DefaultChannelPromise (this .channel(), this .executor()); } public Channel channel () { return this .pipeline.channel(); } public EventExecutor executor () { return (EventExecutor)(this .executor == null ? this .channel().eventLoop() : this .executor); } public ChannelFuture writeAndFlush (Object msg, ChannelPromise promise) { this .write(msg, true , promise); return promise; } private void write (Object msg, boolean flush, ChannelPromise promise) { ObjectUtil.checkNotNull(msg, "msg" ); try { if (this .isNotValidPromise(promise, true )) { ReferenceCountUtil.release(msg); return ; } } catch (RuntimeException var8) { ReferenceCountUtil.release(msg); throw var8; } AbstractChannelHandlerContext next = this .findContextOutbound(flush ? 98304 : '耀' ); Object m = this .pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractChannelHandlerContext.WriteTask task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise, flush); if (!safeExecute(executor, task, promise, m, !flush)) { task.cancel(); } } } }