博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java高并发设计(十五)-- netty通信之全部
阅读量:6087 次
发布时间:2019-06-20

本文共 5697 字,大约阅读时间需要 18 分钟。

hot3.png

接着上面的文章我们继续讲,netty的其他功能都是模板式的开发和配置,我们需要定制化开发的功能模块就是序列化、反序列,消息句柄。

1,序列化、反序列

首先定义我们的超类接口,定义需要的核心功能

public interface Serialization {    //序列化得主操作,将对象转变成字节数组    byte[] serializ(Object object) throws IOException;    //反序列化,将字节数组转换为相应的对象    
T deserializ(byte[] bytes, Class
clazz) throws IOException; Object deserializ(byte[] bytes) throws IOException;}

再者我们采用的是hession的序列化操作,对相关的对象进行序列化操作

public class Hession2Serialization implements Serialization {    /**     * hession的序列化操作,将对象转化成字节码     * @param object     * @return     * @throws IOException     */    public byte[] serializ(Object object) throws IOException {        ByteArrayOutputStream baos = new ByteArrayOutputStream();        Hessian2Output hession = new Hessian2Output(baos);        hession.writeObject(object);        hession.flush();        return baos.toByteArray();    }    /**     * hession的反序列化,将字节转换成对象,并且是指定类型的对象     * @param bytes     * @param clazz     * @param 
* @return * @throws IOException */ public
T deserializ(byte[] bytes, Class
clazz) throws IOException { Hessian2Input hession = new Hessian2Input(new ByteArrayInputStream(bytes)); return (T) hession.readObject(clazz); } /** * hession的反序列化实现 * @param bytes * @return * @throws IOException */ public Object deserializ(byte[] bytes) throws IOException { Hessian2Input hession = new Hessian2Input(new ByteArrayInputStream(bytes)); return hession.readObject(); }}

然后我们对功能的封装,保证该功能的公用

public class HessionMessageCodec implements MessageCodec {    /**     * 编码,后期可以扩展自定义的协议     * @param byteBuf     * @param message     * @throws IOException     */    @Override    public void encode(final ByteBuf byteBuf, final Object message) throws IOException {        Hession2Serialization serialization = new Hession2Serialization();        byte[] bytes = serialization.serializ(message);        int bodyLength = bytes.length;        //消息体的长度        byteBuf.writeInt(bodyLength);        //消息体的内容        byteBuf.writeBytes(bytes);    }    /**     * 解码     * @param bytes     * @return     * @throws IOException     */    @Override    public Object decode(byte[] bytes) throws IOException {        Hession2Serialization serialization = new Hession2Serialization();        return serialization.deserializ(bytes);    }}

最后我们就是针对netty中的编码和解码,和netty进行实际功能调用的

第一我们设计的是顶层的功能代码

public class MessageEncoder extends MessageToByteEncoder {    private MessageCodec messageCodec;    public MessageEncoder(MessageCodec messageCodec) {        this.messageCodec = messageCodec;    }    /**     * 编码的核心实现,该方法采用不同的编码核心类注入来实现编码     * @param channelHandlerContext     * @param obj     * @param byteBuf     * @throws Exception     */    @Override    protected void encode(final ChannelHandlerContext channelHandlerContext, final Object obj, final ByteBuf byteBuf) throws Exception {        messageCodec.encode(byteBuf, obj);    }}
public class MessageDecoder extends ByteToMessageDecoder {    private MessageCodec messageCodec;    public MessageDecoder(MessageCodec messageCodec) {        this.messageCodec = messageCodec;    }    /**     * 解码功能的核心实现     * @param channelHandlerContext     * @param byteBuf     * @param list     * @throws Exception     */    @Override    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {        if (byteBuf.readableBytes() < FinalStaticUtil.HEADER_LENGTH) {            return;        }        //从初始位置开始读取数据        byteBuf.markReaderIndex();        int bodyLength = byteBuf.readInt();        if (bodyLength < 0) {            channelHandlerContext.close();        }        //后面的消息长度是都和传递的消息长度是否一致        if (byteBuf.readableBytes() < bodyLength) {            byteBuf.resetReaderIndex();            return;        }        //读取实际数据        byte[] bytes = new byte[bodyLength];        byteBuf.readBytes(bytes);        try {            //调用真实的解码方法,将字节转换成相应的对象            Object obj = messageCodec.decode(bytes);            list.add(obj);        } catch (Exception e) {        }    }}

最后才是和netty直接通信的编码解码

public class HessionMessageEncoder extends MessageEncoder {    public HessionMessageEncoder(MessageCodec messageCodec) {        super(messageCodec);    }}
public class HessionMessageDecoder extends MessageDecoder {    public HessionMessageDecoder (MessageCodec messageCodec) {        super(messageCodec);    }}

2,通信句柄

public class PullMessageHander extends ChannelInboundHandlerAdapter {    private ConcurrentHashMap
rpcBeanMap; public PullMessageHander (ConcurrentHashMap
rpcBeanMap) { this.rpcBeanMap = rpcBeanMap; } /** * 读取客户端传递的数据 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RpcRequest rpcRequest = (RpcRequest) msg; //初始化返回结果 RpcResponse rpcResponse = new RpcResponse(); //将数据设计成线程的形式,保证操作的并发处理 PullMessageHandleTask handleTask = new PullMessageHandleTask(rpcRequest, rpcResponse, rpcBeanMap); //处理客户端传递的数据,并且将处理结果返回给客户端 PullExecuter.submit(handleTask, rpcRequest, rpcResponse, ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //super.exceptionCaught(ctx, cause); ctx.close(); }}

 

 

最后再说,其他的不说,附上项目的源码地址:

http://git.oschina.net/gnn-wsx/ones-monitor/tree/master/monitor-core?dir=1&filepath=monitor-core

该项目维护在开源中国的码云上,核心地址是http://git.oschina.net/gnn-wsx/ones-monitor,rpc的功能主要体现在monitor-core上

转载于:https://my.oschina.net/wangshuaixin/blog/865029

你可能感兴趣的文章
spark高级排序彻底解秘
查看>>
ylbtech-LanguageSamples-PartialTypes(部分类型)
查看>>
福建省促进大数据发展:变分散式管理为统筹集中式管理
查看>>
开发环境、生产环境、测试环境的基本理解和区别
查看>>
tomcat多应用之间如何共享jar
查看>>
Flex前后台交互,service层调用后台服务的简单封装
查看>>
MySQL入门12-数据类型
查看>>
Windows Azure 保留已存在的虚拟网络外网IP(云服务)
查看>>
修改字符集
查看>>
HackTheGame 攻略 - 第四关
查看>>
js删除数组元素
查看>>
带空格文件名的处理(find xargs grep ..etc)
查看>>
华为Access、Hybrid和Trunk的区别和设置
查看>>
centos使用docker下安装mysql并配置、nginx
查看>>
关于HTML5的理解
查看>>
需要学的东西
查看>>
Internet Message Access Protocol --- IMAP协议
查看>>
Linux 获取文件夹下的所有文件
查看>>
对 Sea.js 进行配置(一) seajs.config
查看>>
第六周
查看>>