Dubbo中Netty使用形式源码解读
先从Provider的角度讲,后从Consumer的角度讲,最后合起来讲共同和不同点。
每一个Dubbo接口作为Provider时,都会对外暴露Invoker,执行Export方法:
public class DubboProtocol extends AbstractProtocol {
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 创建 DubboExporter 对象,并添加到 `exporterMap` 。
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
......
// 启动服务器
openServer(url);
// 初始化序列化优化器
optimizeSerialization(url);
return exporter;
}
/** * 启动通信服务器 * * @param url URL */
private void openServer(URL url) {
// find server, ip:port
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务。
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); // isserver
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
}
在此期间需要实例化Server,多个不同的Provider,如果端口相同,那么会共用一个Server(NettyServer),也就是只实例化一次。
public class NettyServer extends AbstractServer implements Server {
@Override
protected void doOpen() {
// 设置日志工厂
NettyHelper.setNettyLoggerFactory();
// 实例化 ServerBootstrap
bootstrap = new ServerBootstrap();
// 创建线程组
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
// 创建 NettyServerHandler 对象
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
// 设置 `channels` 属性
channels = nettyServerHandler.getChannels();
bootstrap
// 设置它的线程组
.group(bossGroup, workerGroup)
// 设置 Channel类型
.channel(NioServerSocketChannel.class) // Server
// 设置可选项
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// 设置Handler链路
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
// 创建 NettyCodecAdapter 对象
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder()) // 解码
.addLast("encoder", adapter.getEncoder()) // 编码
.addLast("handler", nettyServerHandler); // 处理器
}
});
// 服务器绑定端口监听
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
}
Netty的Reactor模型和ChannelPipeline模型网上很多,这里就不列举了。
上述代码实例化Netty的Reactor模型相关的对象,默认使用NioSocketChannel(Selector)作为数据传输通道,按顺序依次添加日志,解码,编码,业务处理的Handler。
Provider的Server在接收到Consumer(Client)的请求数据后,先通过解码Handler进行以下处理:
- 首先通过Dubbo协议的数据格式来对TCP/IP数据进行拆包
- 将拆包得到的数据进行反序列化,比如Hession,Json,Thrift,ProtoBuf等,得到Java DTO
- 将DTO以及Invoker的相关信息封装为一个Task,投递到执行线程池中,最终调用Dubbo接口
- 将业务结果通过序列化工具序列化为可传输的字节数组,然后根据传输协议(Dubbo)组装成相应的数据格式,写入Channel
1.数据拆包及获取传输的Body:
public class ExchangeCodec extends TelnetCodec {
// header length.
protected static final int HEADER_LENGTH = 16;
// magic header.
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
// 读取 Header 数组,Header长度为16字节
int readable = buffer.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
// 解码
return decode(channel, buffer, readable, header);
}
/** * @param channel * @param buffer * @param readable 可读取长度 * @param header 数据头部 * @return * @throws IOException */
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 非 Dubbo 协议,目前是 Telnet 命令。
// 通过MAGIC判断请求数据是否为Dubbo协议
if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) {
// 将 buffer 完全复制到 `header` 数组中。因为,上面的 `#decode(channel, buffer)` 方法,可能未读全
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
// 【TODO 8026 】header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW ?
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
// 提交给父类( Telnet ) 处理,目前是 Telnet 命令。
return super.decode(channel, buffer, readable, header);
}
// Header 长度不够,返回需要更多的输入
// check length.
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// `[96 - 127]`:Body 的**长度**。通过该长度,读取 Body 。
// get data length.
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
// 总长度不够,返回需要更多的输入
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// 解析 Header + Body
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
return decodeBody(channel, is, header);
} finally {
// skip 未读完的流,并打印错误日志
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}
每次请求的数据都严格按照指定协议组装,Header+Body。Header16个字节长度,通过Header的数据结构来判断Channel里读取的数据是不是一次请求的完整数据。如果不够,那么返回,等待此Channel的下次数据读取;如果够了,那么将ByteBufer里指定Body长度的数据提交给序列化器反序列化为Java DTO。
2.将Body反序列化为Java DTO:
public class ExchangeCodec extends TelnetCodec {
/** * 解析,返回 Request 或 Response * * @param channel 通道 * @param is 输出 * @param header Header * @return 结果 * @throws IOException 当发生 IO 异常时 */
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
ObjectInput in = s.deserialize(channel.getUrl(), is);
// Response
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) { // Response
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// get status.
byte status = header[3];
res.setStatus(status);
if (status == Response.OK) {
try {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (res.isEvent()) {
data = decodeEventData(channel, in);
} else {
data = decodeResponseData(channel, in, getRequestData(id)); // `#getRequestData(id)` 的调用,是多余的
}
res.setResult(data);
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
} else {
res.setErrorMessage(in.readUTF());
}
return res;
// Request
} else { // Request
// decode request.
Request req = new Request(id);
req.setVersion("2.0.0");
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) { // 心跳事件
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (req.isEvent()) {
data = decodeEventData(channel, in);
} else {
data = decodeRequestData(channel, in);
}
req.setData(data);
} catch (Throwable t) {
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
}
}
根据Header里的第三个字节来判断当前数据是请求还是响应(因为Server和Client共用一套数据处理流程),实例化响应的Request,Response对象,将Header的第四个字节Status存入返回对象中,将第4-12个字节组成的RequestId存入返回对象,将Body数据通过反序列化得到的DTO存入返回对象。RequestId是一次请求的唯一标识,Client通过此标识来将Server返回的数据映射到之前发送的请求上。
3.对于Server,异步形式处理请求数据
public class AllChannelHandler extends WrappedChannelHandler {
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
......
}
}
}
4.调用Dubbo接口,将请求结果反序列化,然后按照协议格式组装数据写入Channel:
调用Dubbo接口的代码在 DubboProtocol#requestHandler里,代码很容易看明白,这里就不列举了
编码的Handler执行顺序在业务Handler写入数据之后,其会请求结果序列化,然后按照协议格式数据
public class ExchangeCodec extends TelnetCodec {
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) { // 请求
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) { // 响应
encodeResponse(channel, buffer, (Response) msg);
} else { // 提交给父类( Telnet ) 处理,目前是 Telnet 命令的结果。
super.encode(channel, buffer, msg);
}
}
/** * 编码响应 * * @param channel 通道 * @param buffer Buffer * @param res 响应 * @throws IOException 当发生 IO 异常时 */
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
int savedWriteIndex = buffer.writerIndex();
try {
Serialization serialization = getSerialization(channel);
// `[0, 15]`:Magic Number
// 构建Header
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
Bytes.short2bytes(MAGIC, header);
// `[16, 20]`:Serialization 编号 && `[23]`:响应。
// set request and serialization flag.
header[2] = serialization.getContentTypeId();
// `[21]`:`event` 是否为事件。
if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
// `[24 - 31]`:`status` 状态。
// set response status.
byte status = res.getStatus();
header[3] = status;
// `[32 - 95]`:`id` 编号,Long 型。
// set request id.
Bytes.long2bytes(res.getId(), header, 4);
// 编码 `Request.data` 到 Body ,并写入到 Buffer
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); // 序列化 Output
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
// encode response data or error message.
if (status == Response.OK) {
if (res.isHeartbeat()) {
encodeHeartbeatData(channel, out, res.getResult());
} else {
encodeResponseData(channel, out, res.getResult());
}
} else {
out.writeUTF(res.getErrorMessage());
}
// 释放资源
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
// 检查 Body 长度,是否超过消息上限。
int len = bos.writtenBytes();
checkPayload(channel, len);
// `[96 - 127]`:Body 的**长度**。
Bytes.int2bytes(len, header, 12);
// 写入 Header 到 Buffer
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
} catch (Throwable t) {
......
}
}
}
在Provider端的Netty的作用流程基本就如上述所示,下面将Consumer端相对于Provider端不同的作用流程。
每个接口的Consumer在初始化时都会实例化一个NettyClient,也就是实例化一个Netty的Bootstrap。如果按照传统思路,每个Bootstrap关联一个EventLoopGroup,内部持有一个Selector,那么每个接口Consumer实例都会有一个Selector,这会造成资源的很大浪费,所以Dubbp定义了一个EventLoopGroup类型的常量,让所有Consumer在实例化时都用同一个EventLoopGroup,也就是一个Selector处理所有的Consumer的Channel:
public class NettyClient extends AbstractClient {
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
protected void doOpen() {
// 设置日志工厂
NettyHelper.setNettyLoggerFactory();
// 创建 NettyClientHandler 对象
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
// 实例化 ServerBootstrap
bootstrap = new Bootstrap();
bootstrap
// 设置它的线程组
.group(nioEventLoopGroup)
// 设置可选项
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
// 设置 Channel类型
.channel(NioSocketChannel.class);
// 设置连接超时时间
if (getTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
}
// 设置责Handler链路
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) {
// 创建 NettyCodecAdapter 对象
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder()) // 解码
.addLast("encoder", adapter.getEncoder()) // 编码
.addLast("handler", nettyClientHandler); // 处理器
}
});
}
}
Consumer相比于Provider还有一点不同:
Provider会使用线程池异步的处理Consumer发送的请求,处理完成后将相应数据写入Channel;
Consumer在大部分情况下都是发送同步请求,也就是会阻塞当前线程,直到Provider返回数据,或者阻塞超时。
public class DubboInvoker<T> extends AbstractInvoker<T> {
protected Result doInvoke(final Invocation invocation) {
RpcInvocation inv = (RpcInvocation) invocation;
// 获得方法名
final String methodName = RpcUtils.getMethodName(invocation);
// 获得 `path`( 服务名 ),`version`
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
// 获得 ExchangeClient 对象
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
// 远程调用
try {
// 获得是否异步调用
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// 获得是否单向调用
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 获得超时时间
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 单向调用
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
// 异步调用
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
// 同步调用
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
如果是同步调用的情况下,会将调用相关信息存入一个DefaultFuture中,然后阻塞,等待Provider返回信息,依据requestId来唤醒当前线程:
public class DefaultFuture implements ResponseFuture {
/** * 通道集合 * * key:请求编号 */
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
/** * Future 集合 * * key:请求编号 */
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
/** * 阻塞式的等待结果 * * @param timeout * @return * @throws RemotingException */
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
// 若未完成,等待
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
// 等待完成或超时
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// 未完成,抛出超时异常 TimeoutException
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
// 返回响应
return returnFromResponse();
}
}
Consumer解析处Provider传递回来的数据,每解析成功一个返回结果,如果此次请求时同步请求,就会通过Condition唤醒相应requestId的请求线程,此时同步请求才会有结果返回。
/** * 完成 Condition */
private final Condition done = lock.newCondition();
/** * 接收响应( Response ) * * @param channel 通道 * @param response 响应 */
public static void received(Channel channel, Response response) {
try {
// 移除 FUTURES
DefaultFuture future = FUTURES.remove(response.getId());
// 接收结果
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
// 移除 CHANNELS
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
// 锁定
lock.lock();
try {
// 设置结果
response = res;
// 通知,唤醒等待
if (done != null) {
done.signal();
}
} finally {
// 释放锁定
lock.unlock();
}
// 调用回调
if (callback != null) {
invokeCallback(callback);
}
}
Consumer的拆包,序列化,协议格式数据组装都和Provider都是相同的,每次请求都会有一个唯一标识requestId来匹配请求和响应。请求在Provider端使用线程池异步执行,在Consumer端可同步,异步,Oneway形式,如果是同步则阻塞线程,等待指定requestId的响应返回来唤醒线程。
还没有评论,来说两句吧...