博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty入门(4) - 附带的ChannelHandler和Codec
阅读量:4977 次
发布时间:2019-06-12

本文共 15778 字,大约阅读时间需要 52 分钟。

使用SSL/TLS创建安全的Netty程序

Java提供了抽象的SslContext和SslEngine,实际上SslContext可以用来获取SslEngine来进行加密和解密。Netty拓展了Java的SslEngine,称SslHandler,用来对网络数据进行加密和解密。

 1、制作自签证书

#keytool -genkey -keysize 2048 -validity 365 -keyalg RSA -dnam e "CN=gornix.com" -keypass 654321 -storepass 123456 -keystore gornix.jks

keytool为JDK提供的生成证书工具

  • -keysize 2048 密钥长度2048位(这个长度的密钥目前可认为无法被暴力破解)
  • -validity 365 证书有效期365天
  • -keyalg RSA 使用RSA非对称加密算法
  • -dname "CN=gornix.com" 设置Common Name为gornix.com,这是我的域名
  • -keypass 654321 密钥的访问密码为654321
  • -storepass 123456 密钥库的访问密码为123456(其实这两个密码也可以设置一样,通常都设置一样,方便记)
  • -keystore gornix.jks 指定生成的密钥库文件为gornix.jks

2、服务端程序

public class SocketServerHelper {        private static int WORKER_GROUP_SIZE = Runtime.getRuntime().availableProcessors() * 2;     private static EventLoopGroup bossGroup;     private static EventLoopGroup workerGroup;          private static Class
channelClass; public static void startSpiderServer() throws Exception { ServerBootstrap b = new ServerBootstrap(); b.childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(false)) .childOption(ChannelOption.SO_RCVBUF, 1048576) .childOption(ChannelOption.SO_SNDBUF, 1048576); bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(WORKER_GROUP_SIZE); channelClass = NioServerSocketChannel.class; System.out.println("workerGroup size:" + WORKER_GROUP_SIZE); System.out.println("preparing to start spider server..."); b.group(bossGroup, workerGroup); b.channel(channelClass); KeyManagerFactory keyManagerFactory = null; KeyStore keyStore = KeyStore.getInstance("JKS"); keyStore.load(new FileInputStream("G:\\ssl.jks"), "123456".toCharArray()); keyManagerFactory = KeyManagerFactory.getInstance("SunX509"); keyManagerFactory.init(keyStore,"123456".toCharArray()); SslContext sslContext = SslContextBuilder.forServer(keyManagerFactory).build(); b.childHandler(new SslChannelInitializer(sslContext)); b.bind(9912).sync(); System.out.println("spider server start sucess, listening on port " + 9912 + "."); } public static void main(String[] args) throws Exception { SocketServerHelper.startSpiderServer(); } public static void shutdown() { System.out.println("preparing to shutdown spider server..."); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); System.out.println("spider server is shutdown."); }}
public class SslChannelInitializer extends ChannelInitializer
{ private final SslContext context; public SslChannelInitializer(SslContext context) { this.context = context; } @Override protected void initChannel(Channel ch) throws Exception { SSLEngine engine = context.newEngine(ch.alloc()); engine.setUseClientMode(false); ch.pipeline().addFirst("ssl", new SslHandler(engine)); ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); //最大16M pipeline.addLast("decoder", new StringDecoder(Charset.forName("UTF-8"))); pipeline.addLast("encoder", new StringEncoder(Charset.forName("UTF-8"))); pipeline.addLast("spiderServerBusiHandler", new SpiderServerBusiHandler()); }}

3、客户端程序

public class SocketClientHelper {     public static void main(String[] args) {            Channel channel = SocketClientHelper.createChannel("localhost",9912);            try {                Thread.sleep(100);            } catch (InterruptedException e) {                e.printStackTrace();            }            SocketHelper.writeMessage(channel, "ssh over tcp test 1");            SocketHelper.writeMessage(channel, "ssh over tcp test 2");            SocketHelper.writeMessage(channel, "ssh over tcp test 3");            SocketHelper.writeMessage(channel, "ssh over tcp test 4");            SocketHelper.writeMessage(channel, "ssh over tcp test 5");        }                public static Channel createChannel(String host, int port) {            Channel channel = null;              Bootstrap b = getBootstrap();            try {                  channel = b.connect(host, port).sync().channel();                System.out.println(MessageFormat.format("connect to spider server ({0}:{1,number,#}) success for thread [" + Thread.currentThread().getName() + "].", host , port));            } catch (Exception e) {                e.printStackTrace();            }              return channel;        }                public static Bootstrap getBootstrap(){              EventLoopGroup group;            Class
channelClass = NioSocketChannel.class; group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(channelClass); b.option(ChannelOption.SO_KEEPALIVE, true); b.option(ChannelOption.TCP_NODELAY, true); b.option(ChannelOption.SO_REUSEADDR, true); b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); TrustManagerFactory tf = null; try { KeyStore keyStore = KeyStore.getInstance("JKS"); keyStore.load(new FileInputStream("G:\\ssl.jks"), "123456".toCharArray()); tf = TrustManagerFactory.getInstance("SunX509"); tf.init(keyStore); SslContext sslContext = SslContextBuilder.forClient().trustManager(tf).build(); b.handler(new SslChannelInitializer(sslContext)); return b; } catch(Exception e) { e.printStackTrace(); } return null; }}
public class SslChannelInitializer extends ChannelInitializer
{ private final SslContext context; public SslChannelInitializer(SslContext context) { this.context = context; } @Override protected void initChannel(Channel ch) throws Exception { SSLEngine engine = context.newEngine(ch.alloc()); engine.setUseClientMode(true); ch.pipeline().addFirst("ssl", new SslHandler(engine)); ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); //最大16M pipeline.addLast("decoder", new StringDecoder(Charset.forName("UTF-8"))); pipeline.addLast("encoder", new StringEncoder(Charset.forName("UTF-8"))); pipeline.addLast("spiderClientBusiHandler", new SpiderClientBusiHandler()); }}

可见SSL也没什么神秘的,就是在普通的TCP连接基础上包了一层处理而已(但如果要自己实现这层处理那可是相当复杂的),这层处理体现在Netty中就是一个SslHandler,把这个SslHandler加入到TCP连接的处理管线中即可。

PS:我们也可以使用基于认证和报文头加密的方式实现安全性。

 

处理空闲和超时

IdleStateHandler:当一个通道没有进行读写或者运行了一段时间后发出IdleStateEvent

ReadTimeoutHandler:在指定时间没没有接收到任何数据将抛出ReadTimeoutException

WriteTimeoutHandler:在指定时间内没有写入数据将抛出WriteTimeoutException

public class IdleStateHandlerInitializer extends ChannelInitializer
{ @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS)); pipeline.addLast(new HeartbeatHandler()); } public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter { private static final ByteBuf HEARTBEAT_SEQ = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8)); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { ctx.writeAndFlush(HEARTBEAT_SEQ.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { super.userEventTriggered(ctx, evt); } } }}

 

分隔符协议

DelimiterBasedFrameDecoder,解码器,接收ByteBuf由一个或者多个分隔符拆分,如NUL或者换行符。

LineBasedFrameDecoder,解码器,接收ByteBuf以分隔符结束,如"\n"和"\r\n"

public class LineBasedHandlerInitializer extends ChannelInitializer
{ @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(65*1024), new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler
{ @Override protected void channelRead0(ChannelHandlerContext arg0, ByteBuf arg1) throws Exception { // do something } }}

 

长度为基础的协议

FixedLengthFrameDecoder:解码器,固定长度提取帧

LengthFieldBasedFrameDecoder:解码器,读取头部长度并提取帧的长度

public class LengthBasedInitializer extends ChannelInitializer
{ @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65*1024, 0, 8)) .addLast(new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler
{ @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // do something } }}

 

写大数据

写大量的数据的一个有效的方法就是使用异步框架,如果内存和网络都处于爆满负荷状态,你需要停止写,Netty提供zero-memory-copy机制,这种方法在将文件内容写到网络堆栈空间时可以获得最大的性能:

public class WriteBigData extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        File file = new File("");        FileInputStream fis = new FileInputStream(file);        FileRegion region = new DefaultFileRegion(fis.getChannel(), 0, file.length());        Channel channel = ctx.channel();        channel.writeAndFlush(region).addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (!future.isSuccess()) {                    Throwable cause = future.cause();                    // do something                }            }        });    }}

如果只想发送指定的数据块,可以使用ChunkedFile、ChunkedNioFile、ChunkedStream、ChunkedNioStream等。

 

Protobuf序列化传输

ProtobufDecoder、ProtobufEncoder、ProtobufVarint32FrameDecoder、ProtobufVarint32LengthPrepender,使用Protobuf需要映入protobuf-java-2.5.0.jar

1、下载编译器,将protoc.exe配置到环境变量:https://github.com/google/protobuf/releases

2、编写.proto文件,参考:https://blog.csdn.net/hry2015/article/details/70766603

syntax = "proto3"; // 声明可以选择protobuf的编译器版本(v2和v3)option java_outer_classname = "MessageProto"; // 指定生成的java类的类名message Message {  // 相当于c语言中的struct语句,表示定义一个信息,其实也就是类。  string id = 1; // 要传输的字段了,子段后面需要有一个数字编号,从1开始递增  string content = 2;}

3、CMD执行编译操作

protoc ./Message.proto --java_out=./

4、引入maven

com.google.protobuf
protobuf-java
3.5.1

5、服务端

public class ServerPoHandlerProto extends ChannelInboundHandlerAdapter {        @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        MessageProto.Message message = (MessageProto.Message) msg;        if (ConnectionPool.getChannel(message.getId()) == null) {            ConnectionPool.putChannel(message.getId(), ctx);        }        System.err.println("server:" + message.getId());        ctx.writeAndFlush(message);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        ctx.close();    }}
bootstrap.group(bossGroup, workerGroup)                .channel(NioServerSocketChannel.class)                .childHandler(new ChannelInitializer
() { @Override public void initChannel(SocketChannel ch) throws Exception { // 实体类传输数据,protobuf序列化 ch.pipeline().addLast("decoder", new ProtobufDecoder(MessageProto.Message.getDefaultInstance())); ch.pipeline().addLast("encoder", new ProtobufEncoder()); ch.pipeline().addLast(new ServerPoHandlerProto()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true);

6、客户端

public class ClientPoHandlerProto extends ChannelInboundHandlerAdapter {        @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        MessageProto.Message message = (MessageProto.Message) msg;        System.out.println("client:" + message.getContent());    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        ctx.close();    }    }
Bootstrap b = new Bootstrap();            b.group(workerGroup);            b.channel(NioSocketChannel.class);            b.option(ChannelOption.SO_KEEPALIVE, true);            b.handler(new ChannelInitializer
() { @Override public void initChannel(SocketChannel ch) throws Exception { // 实体类传输数据,protobuf序列化 ch.pipeline().addLast("decoder", new ProtobufDecoder(MessageProto.Message.getDefaultInstance())); ch.pipeline().addLast("encoder", new ProtobufEncoder()); ch.pipeline().addLast(new ClientPoHandlerProto()); } });

 

单元测试

package com.netty.learn.demo6;import java.util.List;import java.util.Random;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelPipeline;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.embedded.EmbeddedChannel;import io.netty.handler.codec.ByteToMessageDecoder;public class EmbeddedChannelInboundTest {    public static void main(String[] args) {        Random r = new Random();        ByteBuf byteBuf = Unpooled.buffer();        for (int i = 0; i < 3; i++) {            int one = r.nextInt();            byteBuf.writeInt(one);            System.out.println("generate one: " + one);        }        EmbeddedChannel embeddedChannel = new EmbeddedChannel();        // 获取channelPipeLine        ChannelPipeline channelPipeline = embeddedChannel.pipeline();        channelPipeline.addFirst(new DecodeTest());        channelPipeline.addLast(new SimpleChannelInBoundHandlerTest());        // 写入测试数据        embeddedChannel.writeInbound(byteBuf);        embeddedChannel.finish();        // 验证测试数据        System.out.println("embeddedChannel readInbound:" + embeddedChannel.readInbound());        System.out.println("embeddedChannel readInbound:" + embeddedChannel.readInbound());        System.out.println("embeddedChannel readInbound:" + embeddedChannel.readInbound());    }}// 解码器class DecodeTest extends ByteToMessageDecoder {    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {        if (in.readableBytes() >= 4) {            out.add(in.readInt());        }    }}// channelHandler@SuppressWarnings("rawtypes")class SimpleChannelInBoundHandlerTest extends SimpleChannelInboundHandler {    @Override    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("Received message:" + msg);        ctx.fireChannelRead(msg);    }}

 

转载于:https://www.cnblogs.com/ijavanese/p/9931808.html

你可能感兴趣的文章
表单校验之datatype
查看>>
python第六篇文件处理类型
查看>>
ubuntu16系统磁盘空间/dev/vda1占用满的问题
查看>>
grid网格布局
查看>>
JSP常用标签
查看>>
九涯的第一次
查看>>
处理器管理与进程调度
查看>>
向量非零元素个数_向量范数详解+代码实现
查看>>
java if 用法详解_Java编程中的条件判断之if语句的用法详解
查看>>
matlab sin函数 fft,matlab的fft函数的使用教程
查看>>
mysql adddate()函数
查看>>
mysql sin() 函数
查看>>
单片机复位电路
查看>>
php json_decode失败,返回null
查看>>
3-day3-list-truple-map.py
查看>>
Edit控件显示多行文字
查看>>
JS第二周
查看>>
dataTable.NET的search box每輸入一個字母進行一次檢索的問題
查看>>
Python 文件处理
查看>>
邻接表详解
查看>>