消息类型,暂时定为
{
1. Server_Ready_Send_File
2. Server_Sending_File
3. Server_End_Send_File
4. Client_Ready_Recv_File
}
1. Server 启动
2. Client 请求连接
3. Server - Client 连接成功创建
4. Server 在创建连接的句柄方法中,创建消息{类型: Server_Ready_Send_File , 数据:<long:文件大小(字节) , byte[] 文件名称>}
5. Client 在接收到该消息之后,便在指定的路径下面创建文件,且,打开文件流,准备写操作;准备工作做好了之后,便向 Server 发送消息 {类型:Client_Ready_Send_File, 数据:<0>}
6. Server 在接收到 Client 的消息之后,便将文件打开,并且执行循环,读取文件中的内容,将文件中的数据封装成消息 {类型:Server_Sending_File,数据<文件中的数据转换为二进制类型>}
循环按行读取文件,当得到结束标语'end of the file!' 的时候,便,将消息的类型转换为 Server_End_Send_File 。 然后,将该文件内容写入之后,发送消息。随后关闭掉与 client 的连接。
7. Client 在接收 Server 的 Server_Sending_File 之后,将其写入到打开的文件中,随后当接收到来自 Server_End_Send_File 类型的消息之后,便可以将该消息写入到文件中,
然后,该 flush() ,flush,该 close(), close . 关闭文件输入流之后,断开与 Server 之间的连接。
下面的代码经过测试,已经可以成功的分块传递大小为 17699324B 的文件,不过代码中仍旧有很多不太恰当,没有细化的地方。
{比如说传递的文件名称的统一,以及每次传递的文件字节数目在小于 (short = 2 byte = 16 bit 可以用来表示 2^15 B 大小的文件 - 3 byte )}
可以最大化传输的文件行数目。但是,这个项目代码暂时可以作为模板使用。
项目的结构与前一篇草稿篇介绍的一样,只不过,在这里我们并不适用 ClientHandlerListener , 而是直接将互相发送的代码写到各自的 Handler 中。
/src/
|----------common/
|-------- MessageType
|-------- Message
|-------- MessageDecoder
|-------- MessageEncoder
|---------- client/
|-------- Client
|-------- ClientHandler
|-------- ClientHandlerListener
|---------- server/
|-------- Server
|-------- ServerHandler
运行代码之前,建议在 /tmp/ 路径下面创建两个名称为 test1.txt ; test2.txt 的文件,
可以根据需要测试的数据量的大小来设置文件的大小,建议将两个文件设置一个超大,一个正常大小。 大文件用来测试程序运行过程中的稳定性,以及大数据量的发送情况。
common/MessageType.java
点击(此处)折叠或打开
-
package org.kylin.zhang.common;
-
-
-
-
/**
-
* Created by root on 6/30/15.
-
*/
-
public enum MessageType
-
{
-
READY_SEND_FILE((byte)0x01) ,
-
READY_RECV_FILE((byte)0x04) ,
-
FILE_SENDING((byte)0x02) ,
-
FILE_END((byte)0x03) ,
-
SHUT_DOWN((byte)0x00),
-
UNKNOWN ((byte)0x00) ;
-
-
-
private final byte b ;
-
-
private MessageType ( byte b )
-
{
-
this.b = b ;
-
}
-
-
-
public static MessageType fromByte ( byte b )
-
{
-
for (MessageType code : values())
-
{
-
if (code.b == b )
-
return code ;
-
}
-
-
return UNKNOWN ;
-
}
-
-
public byte getByte ()
-
{
-
return this.b ;
-
}
- }
common/Message.java
点击(此处)折叠或打开
-
package org.kylin.zhang.common;
-
-
import java.io.Serializable ;
-
-
/**
-
* Created by root on 6/30/15.
-
*/
-
-
public class Message implements Serializable
-
{
-
private MessageType type ;
-
private short length ;
-
private byte [] data ;
-
-
public Message ()
-
{}
-
-
public Message (MessageType type , short len , byte [] data )
-
{
-
this.type = type ;
-
this.length = len ;
-
this.data = data ;
-
}
-
-
public short getLength ()
-
{
-
return this.length ;
-
}
-
public void setLength ( short len )
-
{
-
this.length = len ;
-
}
-
-
-
public MessageType getType ()
-
{
-
return this.type ;
-
}
-
-
public void setType ( MessageType type )
-
{
-
this.type = type ;
-
}
-
-
public byte [] getData ()
-
{
-
return this.data ;
-
}
-
-
public void setData ( byte [] data )
-
{
-
this.data = data ;
-
}
-
-
-
@Override
-
public String toString ()
-
{
-
return "\nmessage type :"+this.type +"\n"
-
+"message length :"+this.length +"\n"
-
+"message content :"+ new String (this.data) +"\n" ;
-
}
-
-
-
-
- }
common/MessageDecoder.java / not changed
common/MessageEncoder.java / not changed
client/Client.java
点击(此处)折叠或打开
-
package org.kylin.zhang.client;
-
-
import org.kylin.zhang.common.* ;
-
import org.jboss.netty.bootstrap.ClientBootstrap ;
-
import org.jboss.netty.channel.ChannelFactory ;
-
import org.jboss.netty.channel.ChannelPipeline ;
-
import org.jboss.netty.channel.ChannelPipelineFactory ;
-
import org.jboss.netty.channel.Channels ;
-
import org.jboss.netty.channel.group.ChannelGroup ;
-
import org.jboss.netty.channel.group.DefaultChannelGroup ;
-
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory ;
-
-
import java.io.* ;
-
import java.net.InetSocketAddress ;
-
import java.util.concurrent.Executors ;
-
import java.util.concurrent.atomic.AtomicInteger ;
-
-
/**
-
* Created by root on 7/1/15.
-
*/
-
public class Client implements ClientHandlerListener
-
{
-
-
private final String host ;
-
private final int port ;
-
-
-
-
private ClientHandler handler ;
-
private ChannelFactory clientFactory ;
-
private ChannelGroup channelGroup ;
-
-
-
private String file_path ;
-
private String file_name ;
-
private long file_len ;
-
private BufferedReader input ;
-
-
public Client ( String host , int port )
-
{
-
this.host = host ;
-
this.port = port ;
-
-
// default received stored path
-
// make sure its existances before running the program
-
-
this.file_path = "/tmp/data/" ;
-
}
-
-
-
-
public void messageReceived (Message message)
-
{
-
// we not use it in this test
-
}
-
-
public boolean start ()
-
{
-
this.clientFactory = new NioClientSocketChannelFactory(
-
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()) ;
-
-
-
this.channelGroup = new DefaultChannelGroup(this + "-channelGroup") ;
-
-
this.handler = new ClientHandler (this, this.channelGroup ) ;
-
-
ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
-
public ChannelPipeline getPipeline() throws Exception {
-
-
ChannelPipeline pipeline = Channels.pipeline() ;
-
pipeline.addLast("encoder" , MessageEncoder.getInstance()) ;
-
pipeline.addLast("decoder" , new MessageDecoder ()) ;
-
pipeline.addLast("handler" , handler) ;
-
-
-
return pipeline ;
-
}
-
} ;
-
-
ClientBootstrap bootstrap = new ClientBootstrap (this.clientFactory) ;
-
-
bootstrap.setOption("reuseAddress" , true ) ;
-
bootstrap.setOption ("tcpNoDealy" , true ) ;
-
bootstrap.setOption ("keepAlive" , true ) ;
-
bootstrap.setPipelineFactory(pipelineFactory ) ;
-
-
-
boolean connected = bootstrap.connect( new InetSocketAddress(host, port ))
-
.awaitUninterruptibly().isSuccess();
-
-
if ( !connected)
-
{
-
this.stop() ;
-
}
-
-
return connected ;
-
}
-
-
public void stop ()
-
{
-
if ( this.channelGroup != null )
-
this.channelGroup.close() ;
-
if ( this.clientFactory != null )
-
this.clientFactory.releaseExternalResources();
-
}
-
private void flood()
-
{
-
if ((this.channelGroup == null) || (this.clientFactory == null)) {
-
System.out.println("do not have any resources, return ");
-
return;
-
}
-
-
/// runnin only once
-
{
-
String data = "KyLin_Zhang" ;
-
-
Message msg = new Message(MessageType.FILE_SENDING ,(short)(data.getBytes().length+3), data.getBytes()) ;
-
// System.out.println("client flood :" + msg ) ;
-
-
this.handler.sendMessage(msg);
-
-
}
-
}
-
-
public static void main ( String [] args ) throws InterruptedException
-
{
-
final Client client = new Client("kylin", 9999) ;
-
-
-
-
if (!client.start())
-
{
-
System.out.println("client failed to start");
-
System.exit(-1);
-
return;
-
}
-
-
System.out.println("Client started ....") ;
-
-
// System.out.println("call flood") ;
-
// client.flood() ;
-
-
-
Runtime.getRuntime().addShutdownHook(
-
new Thread()
-
{
-
@Override
-
public void run ()
-
{
-
client.stop () ;
-
}
-
}) ;
-
}
- }
client/ClientHandler.java
点击(此处)折叠或打开
-
package org.kylin.zhang.client;
-
-
import org.jboss.netty.channel.*;
-
import org.jboss.netty.channel.group.ChannelGroup ;
-
-
import org.kylin.zhang.common.* ;
-
-
import java.io.*;
-
-
/**
-
* Created by root on 7/1/15.
-
*/
-
public class ClientHandler extends SimpleChannelUpstreamHandler
-
{
-
-
private final ClientHandlerListener listener ;
-
private final ChannelGroup channelGroup ;
-
private Channel channel ;
-
-
private String file_path ;
-
private String file_name ;
-
private long file_len ;
-
private BufferedWriter output ;
-
-
-
public ClientHandler(ClientHandlerListener listener , ChannelGroup channelGroup)
-
{
-
this.listener = listener ;
-
this.channelGroup = channelGroup ;
-
-
this.file_path = "/home/" ;
-
}
-
-
@Override
-
public void messageReceived ( ChannelHandlerContext ctx, MessageEvent e)
-
throws Exception
-
{
-
if ( e.getMessage () instanceof Message )
-
{
-
// output this
-
// this.listener.messageReceived((Message)e.getMessage()) ;
-
-
Message message = (Message)e.getMessage() ;
-
-
// print out message
-
-
// System.out.println("message received from server "+ message ) ;
-
// after receive message , create a new file and write the message content into it
-
-
-
// this.channelGroup.disconnect() ;
-
-
// in this branch , we first get the message type and then if else if it
-
// by different kinds of messages , we use different methods
-
// if - READY_SEND_FILE ---> we extract file name , and file len
-
// as static member values, and create the file at target path
-
// get its InputStream object , ready to write , and return the 'READY_RECV_FILE'
-
// message to the server
-
-
// if - FILE_SENDING ----> we extract file contents and write each line
-
// into the opened file
-
-
// if -- FILE_END ---> we write the last block of the message data , then
-
// we flush and then close the file
-
-
-
-
MessageType recv_msg_type = message.getType() ;
-
-
if ( recv_msg_type == MessageType.READY_SEND_FILE)
-
{
-
String data = new String (message.getData()) ;
-
-
file_name = data.substring( data.indexOf(':')+1) ;
-
-
-
this.file_len = Long.decode(data.substring(0 , data.lastIndexOf(':'))) ;
-
-
System.out.println ("we got file_name : "+ this.file_name ) ;
-
System.out.println ("we got file_len : "+this.file_len ) ;
-
-
// we create file and then open it , and then , create response message to server
-
-
File file = new File( this.file_path+this.file_name ) ;
-
-
if ( file.exists())
-
file.delete() ;
-
-
try
-
{
-
file.createNewFile();
-
this.output = new BufferedWriter( new FileWriter( file )) ;
-
}
-
catch (Exception ex )
-
{}
-
-
// here we create message of type of READY_RECV_FILE
-
-
data = "client get ready" ;
-
Message response_msg = new Message(MessageType.READY_RECV_FILE , (short)(3+data.getBytes().length),
-
data.getBytes()) ;
-
this.channel.write(response_msg) ;
-
}
-
-
if ( recv_msg_type == MessageType.FILE_SENDING)
-
{
-
String data = new String (message.getData()) ;
-
this.output.newLine();
-
this.output.write(data);
-
this.output.flush();
-
-
}
-
-
if ( recv_msg_type == MessageType.FILE_END)
-
{
-
String data = new String (message.getData()) ;
-
-
this.output.newLine();
-
this.output.write(data) ;
-
this.output.flush() ;
-
-
this.output.close() ;
-
-
// create message with type of SHUT_DOWN , after server received this it will
-
// disconnect the channel
-
-
data = "client gonning to shut down" ;
-
Message msg = new Message (MessageType.SHUT_DOWN , (short)(3+data.getBytes().length) ,
-
data.getBytes()) ;
-
-
e.getChannel().write(msg) ;
-
-
try
-
{
-
System.out.println("client going to shut down ......") ;
-
Thread.sleep(4000);
-
e.getChannel().disconnect() ;
-
}
-
catch ( Exception ex )
-
{}
-
}
-
-
}
-
else
-
{
-
super.messageReceived(ctx, e ) ;
-
}
-
-
}
-
-
@Override
-
public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e)
-
throws Exception
-
{
-
System.out.println("client channelConnected: ") ;
-
this.channel = e.getChannel();
-
-
-
-
this.channelGroup.add(e.getChannel()) ;
-
-
}
-
-
public void sendMessage ( Message msg )
-
{
-
if ( this.channel != null )
-
{
-
System.out.println("sendMessage : "+ msg ) ;
-
this.channel.write(msg) ;
-
-
}
-
}
- }
client/ClientHandlerListener.java / not use
/ not use it , copy it from the previous blog is ok
server/Server.java
点击(此处)折叠或打开
-
package org.kylin.zhang.server;
-
-
import org.kylin.zhang.common.* ;
-
-
import org.jboss.netty.bootstrap.ServerBootstrap;
-
import org.jboss.netty.channel.Channel ;
-
import org.jboss.netty.channel.ChannelPipeline ;
-
import org.jboss.netty.channel.ChannelPipelineFactory ;
-
import org.jboss.netty.channel.Channels ;
-
import org.jboss.netty.channel.ServerChannelFactory ;
-
import org.jboss.netty.channel.group.DefaultChannelGroup;
-
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory ;
-
-
import java.net.InetSocketAddress ;
-
import java.util.concurrent.Executors ;
-
-
-
/**
-
* Created by root on 7/1/15.
-
*/
-
public class Server {
-
-
private final String host ;
-
private final int port ;
-
private DefaultChannelGroup channelGroup ;
-
private ServerChannelFactory serverFactory ;
-
-
-
-
public Server ( String host , int port )
-
{
-
this.host = host ;
-
this.port = port ;
-
-
}
-
-
public boolean start ()
-
{
-
this.serverFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
-
Executors.newCachedThreadPool()) ;
-
-
this.channelGroup = new DefaultChannelGroup (this + "-channelGroup") ;
-
-
ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory ()
-
{
-
public ChannelPipeline getPipeline() throws Exception
-
{
-
ChannelPipeline pipeline = Channels.pipeline() ;
-
pipeline.addLast ("encoder" , MessageEncoder.getInstance()) ;
-
pipeline.addLast("decoder" , new MessageDecoder() ) ;
-
pipeline.addLast("handler" , new ServerHandler(channelGroup)) ;
-
-
return pipeline ;
-
}
-
} ;
-
-
ServerBootstrap bootstrap = new ServerBootstrap(this.serverFactory) ;
-
bootstrap.setOption("reuseAddress" , true) ;
-
bootstrap.setOption ("child.tcpNoDelay" , true ) ;
-
bootstrap.setOption ("child.keepAlive" , true ) ;
-
bootstrap.setPipelineFactory(pipelineFactory) ;
-
-
-
Channel channel = bootstrap.bind( new InetSocketAddress(this.host, this.port)) ;
-
-
if ( !channel.isBound())
-
{
-
this.stop() ;
-
return false ;
-
}
-
-
this.channelGroup.add(channel) ;
-
return true ;
-
}
-
-
public void stop ()
-
{
-
if (this.channelGroup != null )
-
this.channelGroup.close () ;
-
if ( this.serverFactory != null )
-
this.serverFactory.releaseExternalResources();
-
}
-
-
-
public static void main ( String [] args )
-
{
-
final Server server = new Server("kylin", 9999 ) ;
-
-
if ( !server.start ())
-
{
-
System.out.println("server failed to run ") ;
-
System.exit(-1);
-
-
return ; // not really needed
-
}
-
-
System.out.println("server started ..... ") ;
-
-
Runtime.getRuntime().addShutdownHook( new Thread ()
-
{
-
@Override
-
public void run ()
-
{
-
server.stop() ;
-
}
-
});
-
}
-
-
- }
server/ServerHandler.java
点击(此处)折叠或打开
-
package org.kylin.zhang.server;
-
-
import org.kylin.zhang.common.* ;
-
import org.jboss.netty.channel.ChannelHandlerContext ;
-
import org.jboss.netty.channel.ChannelStateEvent ;
-
import org.jboss.netty.channel.MessageEvent ;
-
import org.jboss.netty.channel.SimpleChannelUpstreamHandler ;
-
import org.jboss.netty.channel.group.ChannelGroup ;
-
-
import java.io.BufferedReader;
-
import java.io.File;
-
import java.io.FileReader;
-
import java.util.HashMap;
-
import java.util.Iterator;
-
import java.util.Map;
-
-
/**
-
* Created by root on 7/1/15.
-
*/
-
public class ServerHandler extends SimpleChannelUpstreamHandler
-
{
-
private final ChannelGroup channelGroup ;
-
private Map<String,String> fileHash ;
-
-
public ServerHandler ( ChannelGroup channelGroup )
-
{
-
-
this.channelGroup = channelGroup ;
-
-
this.fileHash = new HashMap<String,String>() ;
-
-
// here we initialize the fileHash
-
fileHash.put("test1" , "/tmp/test1.txt") ;
-
fileHash.put("test2" , "/tmp/test2.txt") ;
-
}
-
-
@Override
-
public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e )
-
{
-
System.out.println("-------------------------- **server gets a new connection**--------------------------") ;
-
this.channelGroup.add(e.getChannel()) ;
-
-
-
byte [] data = null ;
-
-
File file = new File ( fileHash.get("test1") ) ;
-
String len_fname = String.valueOf(file.length()) ;
-
len_fname += ":test1" ;
-
-
// file_length:file_name
-
-
data = len_fname.getBytes() ;
-
-
Message msg = new Message(MessageType.READY_SEND_FILE , (short)(3+data.length) ,data);
-
-
e.getChannel().write(msg) ;
-
-
// e.getChannel().disconnect() ;
-
}
-
-
@Override
-
public void messageReceived ( ChannelHandlerContext ctx, MessageEvent e )
-
throws Exception
-
{
-
-
if ( e.getMessage() instanceof Message ) {
-
-
-
Message received_msg = (Message) e.getMessage();
-
-
MessageType recv_msg_type = received_msg.getType () ;
-
-
System.out.println("server received message: " + received_msg);
-
-
if ( recv_msg_type == MessageType.READY_RECV_FILE)
-
{
-
// in this open the file and use while cycle send file data <=250 bytes to client
-
// if read the last line 'the end of the file!'
-
// change the response file state from SENDING_FILE to END_FILE
-
// and then close the channel
-
-
File f = new File (this.fileHash.get("test1")) ;
-
BufferedReader input = new BufferedReader( new FileReader(f) ) ;
-
-
String line ;
-
-
while(!( line = input.readLine()).equals("the end of file!"))
-
{
-
Message send_msg = new Message( MessageType.FILE_SENDING , (short)(3+line.getBytes().length) ,
-
line.getBytes()) ;
-
-
e.getChannel().write(send_msg) ;
-
}
-
-
if (line.equals("the end of file!"))
-
{
-
Message send_msg = new Message (MessageType.FILE_END, (short)(3+line.getBytes().length),
-
line.getBytes() ) ;
-
e.getChannel().write(send_msg) ;
-
}
-
-
}
-
else if ( recv_msg_type == MessageType.SHUT_DOWN)
-
{
-
e.getChannel().disconnect() ;
-
}
-
else
-
{
-
System.out.println ("UNKNOWN Message ") ;
-
}
-
-
-
}
-
else
-
{
-
super.messageReceived(ctx, e);
-
}
-
-
}
- }
运行结果:
先运行服务器端
(我在程序中,使用的是自己的主机别名创建的连接,根据不同的主机需要变动 hostname):
server started .....
-------------------------- **server gets a new connection**--------------------------
server received message:
message type :READY_RECV_FILE
message length :19
message content :client get ready
server received message:
message type :SHUT_DOWN
message length :30
message content :client gonning to shut down
在运行客户端:
Client started ....
client channelConnected:
we got file_name : test1
we got file_len : 17699324
client going to shut down ......
Process finished with exit code 0
end