接着上面的文章我们继续讲,netty的其他功能都是模板式的开发和配置,我们需要定制化开发的功能模块就是序列化、反序列,消息句柄。
1,序列化、反序列
首先定义我们的超类接口,定义需要的核心功能
public interface Serialization { //序列化得主操作,将对象转变成字节数组 byte[] serializ(Object object) throws IOException; //反序列化,将字节数组转换为相应的对象T deserializ(byte[] bytes, Class clazz) throws IOException; Object deserializ(byte[] bytes) throws IOException;}
再者我们采用的是hession的序列化操作,对相关的对象进行序列化操作
public class Hession2Serialization implements Serialization { /** * hession的序列化操作,将对象转化成字节码 * @param object * @return * @throws IOException */ public byte[] serializ(Object object) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); Hessian2Output hession = new Hessian2Output(baos); hession.writeObject(object); hession.flush(); return baos.toByteArray(); } /** * hession的反序列化,将字节转换成对象,并且是指定类型的对象 * @param bytes * @param clazz * @param* @return * @throws IOException */ public T deserializ(byte[] bytes, Class clazz) throws IOException { Hessian2Input hession = new Hessian2Input(new ByteArrayInputStream(bytes)); return (T) hession.readObject(clazz); } /** * hession的反序列化实现 * @param bytes * @return * @throws IOException */ public Object deserializ(byte[] bytes) throws IOException { Hessian2Input hession = new Hessian2Input(new ByteArrayInputStream(bytes)); return hession.readObject(); }}
然后我们对功能的封装,保证该功能的公用
public class HessionMessageCodec implements MessageCodec { /** * 编码,后期可以扩展自定义的协议 * @param byteBuf * @param message * @throws IOException */ @Override public void encode(final ByteBuf byteBuf, final Object message) throws IOException { Hession2Serialization serialization = new Hession2Serialization(); byte[] bytes = serialization.serializ(message); int bodyLength = bytes.length; //消息体的长度 byteBuf.writeInt(bodyLength); //消息体的内容 byteBuf.writeBytes(bytes); } /** * 解码 * @param bytes * @return * @throws IOException */ @Override public Object decode(byte[] bytes) throws IOException { Hession2Serialization serialization = new Hession2Serialization(); return serialization.deserializ(bytes); }}
最后我们就是针对netty中的编码和解码,和netty进行实际功能调用的
第一我们设计的是顶层的功能代码
public class MessageEncoder extends MessageToByteEncoder
public class MessageDecoder extends ByteToMessageDecoder { private MessageCodec messageCodec; public MessageDecoder(MessageCodec messageCodec) { this.messageCodec = messageCodec; } /** * 解码功能的核心实现 * @param channelHandlerContext * @param byteBuf * @param list * @throws Exception */ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
最后才是和netty直接通信的编码解码
public class HessionMessageEncoder extends MessageEncoder { public HessionMessageEncoder(MessageCodec messageCodec) { super(messageCodec); }}
public class HessionMessageDecoder extends MessageDecoder { public HessionMessageDecoder (MessageCodec messageCodec) { super(messageCodec); }}
2,通信句柄
public class PullMessageHander extends ChannelInboundHandlerAdapter { private ConcurrentHashMaprpcBeanMap; public PullMessageHander (ConcurrentHashMap rpcBeanMap) { this.rpcBeanMap = rpcBeanMap; } /** * 读取客户端传递的数据 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RpcRequest rpcRequest = (RpcRequest) msg; //初始化返回结果 RpcResponse rpcResponse = new RpcResponse(); //将数据设计成线程的形式,保证操作的并发处理 PullMessageHandleTask handleTask = new PullMessageHandleTask(rpcRequest, rpcResponse, rpcBeanMap); //处理客户端传递的数据,并且将处理结果返回给客户端 PullExecuter.submit(handleTask, rpcRequest, rpcResponse, ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //super.exceptionCaught(ctx, cause); ctx.close(); }}
最后再说,其他的不说,附上项目的源码地址:
http://git.oschina.net/gnn-wsx/ones-monitor/tree/master/monitor-core?dir=1&filepath=monitor-core
该项目维护在开源中国的码云上,核心地址是http://git.oschina.net/gnn-wsx/ones-monitor,rpc的功能主要体现在monitor-core上