今天我们来完成一个使用netty进行文件传输的任务。在实际项目中,文件传输通常采用FTP或者HTTP附件的方式。事实上通过TCP Socket File的方式进行文件传输也有一定的应用场景,尽管不是主流,但是掌握这种文件传输方式还是比较重要的,特别是针对两个跨主机的JVM进程之间进行持久化数据的相互交换。
而使用netty来进行文件传输也是利用netty天然的优势:零拷贝功能。很多同学都听说过netty的”零拷贝”功能,但是具体体现在哪里又不知道,下面我们就简要介绍下:
Netty的“零拷贝”主要体现在如下三个方面:
- Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
- Netty提供了组合Buffer对象,可以聚合多个ByteBuffer对象,用户可以像操作一个Buffer那样方便的对组合Buffer进行操作,避免了传统通过内存拷贝的方式将几个小Buffer合并成一个大的Buffer。
- Netty的文件传输采用了transferTo方法,它可以直接将文件缓冲区的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝问题。
具体的分析在此就不多做介绍,有兴趣的可以查阅相关文档。我们还是把重点放在文件传输上。Netty作为高性能的服务器端异步IO框架必然也离不开文件读写功能,我们可以使用netty模拟http的形式通过网页上传文件写入服务器,当然要使用http的形式那你也用不着netty!大材小用。
netty4中如果想使用http形式上传文件你还得借助第三方jar包:okhttp。使用该jar完成http请求的发送。但是在netty5 中已经为我们写好了,我们可以直接调用netty5的API就可以实现。
所以netty4和5的差别还是挺大的,至于使用哪个,那就看你们公司选择哪一个了!本文目前使用netty4来实现文件上传功能。下面我们上代码:
pom文件:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.5.Final</version>
</dependency>
server端:
importio.netty.bootstrap.ServerBootstrap;
importio.netty.channel.Channel;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelOption;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.nio.NioServerSocketChannel;
importio.netty.handler.codec.serialization.ClassResolvers;
importio.netty.handler.codec.serialization.ObjectDecoder;
importio.netty.handler.codec.serialization.ObjectEncoder;
publicclassFileUploadServer{
publicvoidbind(intport)throwsException{
EventLoopGroupbossGroup=newNioEventLoopGroup();
EventLoopGroupworkerGroup=newNioEventLoopGroup();
try{
ServerBootstrapb=newServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,1024).childHandler(newChannelInitializer<Channel>(){
@Override
protectedvoidinitChannel(Channelch)throwsException{
ch.pipeline().addLast(newObjectEncoder());
ch.pipeline().addLast(newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.weakCachingConcurrentResolver(null)));//最大长度
ch.pipeline().addLast(newFileUploadServerHandler());
}
});
ChannelFuturef=b.bind(port).sync();
f.channel().closeFuture().sync();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
publicstaticvoidmain(String[]args){
intport=8080;
if(args!=null&&args.length>0){
try{
port=Integer.valueOf(args[0]);
}catch(NumberFormatExceptione){
e.printStackTrace();
}
}
try{
newFileUploadServer().bind(port);
}catch(Exceptione){
e.printStackTrace();
}
}
}
importio.netty.bootstrap.Bootstrap;
importio.netty.channel.Channel;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelOption;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.nio.NioSocketChannel;
importio.netty.handler.codec.serialization.ClassResolvers;
importio.netty.handler.codec.serialization.ObjectDecoder;
importio.netty.handler.codec.serialization.ObjectEncoder;
importjava.io.File;
publicclassFileUploadClient{
publicvoidconnect(intport,Stringhost,finalFileUploadFilefileUploadFile)throwsException{
EventLoopGroupgroup=newNioEventLoopGroup();
try{
Bootstrapb=newBootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).handler(newChannelInitializer<Channel>(){
@Override
protectedvoidinitChannel(Channelch)throwsException{
ch.pipeline().addLast(newObjectEncoder());
ch.pipeline().addLast(newObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));
ch.pipeline().addLast(newFileUploadClientHandler(fileUploadFile));
}
});
ChannelFuturef=b.connect(host,port).sync();
f.channel().closeFuture().sync();
}finally{
group.shutdownGracefully();
}
}
publicstaticvoidmain(String[]args){
intport=8080;
if(args!=null&&args.length>0){
try{
port=Integer.valueOf(args[0]);
}catch(NumberFormatExceptione){
e.printStackTrace();
}
}
try{
FileUploadFileuploadFile=newFileUploadFile();
Filefile=newFile("c:/1.txt");
StringfileMd5=file.getName();//文件名
uploadFile.setFile(file);
uploadFile.setFile_md5(fileMd5);
uploadFile.setStarPos(0);//文件开始位置
newFileUploadClient().connect(port,"127.0.0.1",uploadFile);
}catch(Exceptione){
e.printStackTrace();
}
}
}
importjava.io.File;
importjava.io.Serializable;
publicclassFileUploadFileimplementsSerializable{
privatestaticfinallongserialVersionUID=1L;
privateFilefile;//文件
privateStringfile_md5;//文件名
privateintstarPos;//开始位置
privatebyte[]bytes;//文件字节数组
privateintendPos;//结尾位置
publicintgetStarPos(){
returnstarPos;
}
publicvoidsetStarPos(intstarPos){
this.starPos=starPos;
}
publicintgetEndPos(){
returnendPos;
}
publicvoidsetEndPos(intendPos){
this.endPos=endPos;
}
publicbyte[]getBytes(){
returnbytes;
}
publicvoidsetBytes(byte[]bytes){
this.bytes=bytes;
}
publicFilegetFile(){
returnfile;
}
publicvoidsetFile(Filefile){
this.file=file;
}
publicStringgetFile_md5(){
returnfile_md5;
}
publicvoidsetFile_md5(Stringfile_md5){
this.file_md5=file_md5;
}
}
输出为:
块儿长度:894
长度:8052
-----------------------------894
byte 长度:894
块儿长度:894
长度:7158
-----------------------------894
byte 长度:894
块儿长度:894
长度:6264
-----------------------------894
byte 长度:894
块儿长度:894
长度:5370
-----------------------------894
byte 长度:894
块儿长度:894
长度:4476
-----------------------------894
byte 长度:894
块儿长度:894
长度:3582
-----------------------------894
byte 长度:894
块儿长度:894
长度:2688
-----------------------------894
byte 长度:894
块儿长度:894
长度:1794
-----------------------------894
byte 长度:894
块儿长度:894
长度:900
-----------------------------894
byte 长度:894
块儿长度:894
长度:6
-----------------------------6
byte 长度:6
块儿长度:894
长度:0
-----------------------------0
文件已经读完--------0
Processfinishedwithexitcode0
这样就实现了服务器端文件的上传,当然我们也可以使用http的形式。
server端:
importio.netty.bootstrap.ServerBootstrap;
importio.netty.channel.ChannelFuture;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.nio.NioServerSocketChannel;
publicclassHttpFileServerimplementsRunnable{
privateintport;
publicHttpFileServer(intport){
super();
this.port=port;
}
@Override
publicvoidrun(){
EventLoopGroupbossGroup=newNioEventLoopGroup(1);
EventLoopGroupworkerGroup=newNioEventLoopGroup();
ServerBootstrapserverBootstrap=newServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
//serverBootstrap.handler(newLoggingHandler(LogLevel.INFO));
serverBootstrap.childHandler(newHttpChannelInitlalizer());
try{
ChannelFuturef=serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
}catch(InterruptedExceptione){
e.printStackTrace();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
publicstaticvoidmain(String[]args){
HttpFileServerb=newHttpFileServer(9003);
newThread(b).start();
}
}
Server端initializer:
importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelPipeline;
importio.netty.channel.socket.SocketChannel;
importio.netty.handler.codec.http.HttpObjectAggregator;
importio.netty.handler.codec.http.HttpServerCodec;
importio.netty.handler.stream.ChunkedWriteHandler;
publicclassHttpChannelInitlalizerextendsChannelInitializer<SocketChannel>{
@Override
protectedvoidinitChannel(SocketChannelch)throwsException{
ChannelPipelinepipeline=ch.pipeline();
pipeline.addLast(newHttpServerCodec());
pipeline.addLast(newHttpObjectAggregator(65536));
pipeline.addLast(newChunkedWriteHandler());
pipeline.addLast(newHttpChannelHandler());
}
}
server端hadler:
importstaticio.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
importstaticio.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
importstaticio.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
importstaticio.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
importstaticio.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
importstaticio.netty.handler.codec.http.HttpVersion.HTTP_1_1;
importio.netty.buffer.Unpooled;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelFutureListener;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.ChannelProgressiveFuture;
importio.netty.channel.ChannelProgressiveFutureListener;
importio.netty.channel.SimpleChannelInboundHandler;
importio.netty.handler.codec.http.DefaultFullHttpResponse;
importio.netty.handler.codec.http.DefaultHttpResponse;
importio.netty.handler.codec.http.FullHttpRequest;
importio.netty.handler.codec.http.FullHttpResponse;
importio.netty.handler.codec.http.HttpChunkedInput;
importio.netty.handler.codec.http.HttpHeaders;
importio.netty.handler.codec.http.HttpResponse;
importio.netty.handler.codec.http.HttpResponseStatus;
importio.netty.handler.codec.http.HttpVersion;
importio.netty.handler.codec.http.LastHttpContent;
importio.netty.handler.stream.ChunkedFile;
importio.netty.util.CharsetUtil;
importio.netty.util.internal.SystemPropertyUtil;
importjava.io.File;
importjava.io.FileNotFoundException;
importjava.io.RandomAccessFile;
importjava.io.UnsupportedEncodingException;
importjava.net.URLDecoder;
importjava.util.regex.Pattern;
importjavax.activation.MimetypesFileTypeMap;
publicclassHttpChannelHandlerextendsSimpleChannelInboundHandler<FullHttpRequest>{
publicstaticfinalStringHTTP_DATE_FORMAT="EEE,ddMMMyyyyHH:mm:sszzz";
publicstaticfinalStringHTTP_DATE_GMT_TIMEZONE="GMT";
publicstaticfinalintHTTP_CACHE_SECONDS=60;
@Override
protectedvoidchannelRead0(ChannelHandlerContextctx,FullHttpRequestrequest)throwsException{
//监测解码情况
if(!request.getDecoderResult().isSuccess()){
sendError(ctx,BAD_REQUEST);
return;
}
finalStringuri=request.getUri();
finalStringpath=sanitizeUri(uri);
System.out.println("get file:" path);
if(path==null){
sendError(ctx,FORBIDDEN);
return;
}
//读取要下载的文件
Filefile=newFile(path);
if(file.isHidden()||!file.exists()){
sendError(ctx,NOT_FOUND);
return;
}
if(!file.isFile()){
sendError(ctx,FORBIDDEN);
return;
}
RandomAccessFileraf;
try{
raf=newRandomAccessFile(file,"r");
}catch(FileNotFoundExceptionignore){
sendError(ctx,NOT_FOUND);
return;
}
longfileLength=raf.length();
HttpResponseresponse=newDefaultHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK);
HttpHeaders.setContentLength(response,fileLength);
setContentTypeHeader(response,file);
//setDateAndCacheHeaders(response,file);
if(HttpHeaders.isKeepAlive(request)){
response.headers().set("CONNECTION",HttpHeaders.Values.KEEP_ALIVE);
}
//Writetheinitiallineandtheheader.
ctx.write(response);
//Writethecontent.
ChannelFuturesendFileFuture=
ctx.write(newHttpChunkedInput(newChunkedFile(raf,0,fileLength,8192)),ctx.newProgressivePromise());
//sendFuture用于监视发送数据的状态
sendFileFuture.addListener(newChannelProgressiveFutureListener(){
@Override
publicvoidoperationProgressed(ChannelProgressiveFuturefuture,longprogress,longtotal){
if(total<0){//totalunknown
System.err.println(future.channel() "Transferprogress:" progress);
}else{
System.err.println(future.channel() "Transferprogress:" progress "/" total);
}
}
@Override
publicvoidoperationComplete(ChannelProgressiveFuturefuture){
System.err.println(future.channel() "Transfercomplete.");
}
});
//Writetheendmarker
ChannelFuturelastContentFuture=ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
//Decidewhethertoclosetheconnectionornot.
if(!HttpHeaders.isKeepAlive(request)){
//Closetheconnectionwhenthewholecontentiswrittenout.
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause){
cause.printStackTrace();
if(ctx.channel().isActive()){
sendError(ctx,INTERNAL_SERVER_ERROR);
}
ctx.close();
}
privatestaticfinalPatternINSECURE_URI=Pattern.compile(".*[<>&\"].*");
privatestaticStringsanitizeUri(Stringuri){
//Decodethepath.
try{
uri=URLDecoder.decode(uri,"UTF-8");
}catch(UnsupportedEncodingExceptione){
thrownewError(e);
}
if(!uri.startsWith("/")){
returnnull;
}
//Convertfileseparators.
uri=uri.replace('/',File.separatorChar);
//Simplisticdumbsecuritycheck.
//Youwillhavetodosomethingseriousintheproductionenvironment.
if(uri.contains(File.separator '.')||uri.contains('.' File.separator)||uri.startsWith(".")||uri.endsWith(".")
||INSECURE_URI.matcher(uri).matches()){
returnnull;
}
//Converttoabsolutepath.
returnSystemPropertyUtil.get("user.dir") File.separator uri;
}
privatestaticvoidsendError(ChannelHandlerContextctx,HttpResponseStatusstatus){
FullHttpResponseresponse=newDefaultFullHttpResponse(HTTP_1_1,status,Unpooled.copiedBuffer("Failure:" status "\r\n",CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE,"text/plain;charset=UTF-8");
//Closetheconnectionassoonastheerrormessageissent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/**
*SetsthecontenttypeheaderfortheHTTPResponse
*
*@paramresponse
*HTTPresponse
*@paramfile
*filetoextractcontenttype
*/
privatestaticvoidsetContentTypeHeader(HttpResponseresponse,Filefile){
MimetypesFileTypeMapm=newMimetypesFileTypeMap();
StringcontentType=m.getContentType(file.getPath());
if(!contentType.equals("application/octet-stream")){
contentType =";charset=utf-8";
}
response.headers().set(CONTENT_TYPE,contentType);
}
}
importjava.io.File;
importjava.io.FileOutputStream;
importio.netty.buffer.ByteBuf;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.ChannelInboundHandlerAdapter;
importio.netty.handler.codec.http.HttpContent;
//importio.netty.handler.codec.http.HttpHeaders;
importio.netty.handler.codec.http.HttpResponse;
importio.netty.handler.codec.http.LastHttpContent;
importio.netty.util.internal.SystemPropertyUtil;
/**
*@Author:yangyue
*@Description:
*@Date:Createdin9:15on2017/5/28.
*/
publicclassHttpDownloadHandlerextendsChannelInboundHandlerAdapter{
privatebooleanreadingChunks=false;//分块读取开关
privateFileOutputStreamfOutputStream=null;//文件输出流
privateFilelocalfile=null;//下载文件的本地对象
privateStringlocal=null;//待下载文件名
privateintsuccCode;//状态码
publicHttpDownloadHandler(Stringlocal){
this.local=local;
}
@Override
publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)
throwsException{
if(msginstanceofHttpResponse){//response头信息
HttpResponseresponse=(HttpResponse)msg;
succCode=response.getStatus().code();
if(succCode==200){
setDownLoadFile();//设置下载文件
readingChunks=true;
}
//System.out.println("CONTENT_TYPE:"
// response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
}
if(msginstanceofHttpContent){//response体信息
HttpContentchunk=(HttpContent)msg;
if(chunkinstanceofLastHttpContent){
readingChunks=false;
}
ByteBufbuffer=chunk.content();
byte[]dst=newbyte[buffer.readableBytes()];
if(succCode==200){
while(buffer.isReadable()){
buffer.readBytes(dst);
fOutputStream.write(dst);
buffer.release();
}
if(null!=fOutputStream){
fOutputStream.flush();
}
}
}
if(!readingChunks){
if(null!=fOutputStream){
System.out.println("Downloaddone->" localfile.getAbsolutePath());
fOutputStream.flush();
fOutputStream.close();
localfile=null;
fOutputStream=null;
}
ctx.channel().close();
}
}
/**
*配置本地参数,准备下载
*/
privatevoidsetDownLoadFile()throwsException{
if(null==fOutputStream){
local=SystemPropertyUtil.get("user.dir") File.separator local;
//System.out.println(local);
localfile=newFile(local);
if(!localfile.exists()){
localfile.createNewFile();
}
fOutputStream=newFileOutputStream(localfile);
}
}
@Override
publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)
throwsException{
System.out.println("管道异常:" cause.getMessage());
cause.printStackTrace();
ctx.channel().close();
}
}
这里客户端我放的是网络连接,下载的是一副图片,启动服务端和客户端就可以看到这个图片被下载到了工程的根目录下。
最后来自小编的福利
小编整理了一份大厂真题的面试资料,以及2021最新Java核心技术整理的资料集锦,需要领取的小伙伴可以 私聊关注我 免费领取 ,编程的世界永远向所有热爱编程的人开放,这是一个自由,平等,共享的世界,我始终是这样坚信的。
喜欢小编的分享可以点赞关注哦,小编持续为你分享最新文章 和 福利领取哦