点击(此处)折叠或打开
- import org.apache.log4j.Logger;
-
-
import java.io.FileNotFoundException;
-
import java.io.IOException;
-
import java.io.RandomAccessFile;
-
import java.io.UnsupportedEncodingException;
-
import java.util.ArrayList;
-
import java.util.List;
-
-
/**
-
* object: FileReadLineThread
-
* author: 程晓鹏
-
* description: 文件读取行线程类
-
* date: 2018-04-13 16:51
-
*/
-
public class FileReadLineThread extends Thread implements Closeable{
-
protected static final Logger log = Logger.getLogger(FileReadLineThread.class);
-
private ReadWriteBooleanLock isRun; //是否运行
-
private ReadWriteBooleanLock isBreakWhile; //是否跳出while循环
-
private IFileReadLineNotify readLineNotify;
-
private List<ReadFileInfo> files; //待处理文件集合
-
private long closeValidSleepTime; //关闭验证时休眠时间
-
private long closeStartTime; //关闭开始时间
-
-
/**
-
* 默认构造函数
-
*/
-
public FileReadLineThread(){
-
this.setName(this.getClass().getName().toString()); //设置线程名
-
this.files = new ArrayList<ReadFileInfo>();
-
this.isRun = new ReadWriteBooleanLock(true);
-
this.isBreakWhile = new ReadWriteBooleanLock(false);
-
this.closeValidSleepTime = 5;
-
this.closeStartTime = 0;
-
}
-
-
public void setReadLineNotify(IFileReadLineNotify readLineNotify) {
-
this.readLineNotify = readLineNotify;
-
}
-
-
private boolean isRun(){
-
return this.isRun.read().booleanValue();
-
}
-
-
private boolean isBreakWhile(){
-
return this.isBreakWhile.read().booleanValue();
-
}
-
-
@Override
-
public void run() {
-
exec();
-
}
-
-
/**
-
* 执行操作
-
*/
-
private void exec(){
-
while(this.isRun()){
-
this.exec0();
-
synchronized (this.isRun){
-
try {
-
if(!this.isRun() && this.files.size()==0){
-
break;
-
}
-
this.isRun.wait();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
-
this.isBreakWhile.write(true);
-
this.exec0(); //再次验证,是否还有未处理的数据
-
long endTime = System.currentTimeMillis(); //结束时间
-
this.finishCloseLog(endTime-this.closeStartTime);
-
}
-
-
/**
-
* 执行文件读取
-
*/
-
private void exec0(){
-
while(this.files.size()>0){
-
ReadFileInfo file = this.files.remove(0);
-
this.execRead(file);
-
}
-
}
-
-
/**
-
* 执行读取
-
* @param file 文件信息
-
*/
-
private void execRead(ReadFileInfo file){
-
long startTime = System.currentTimeMillis(); //开始时间
-
RandomAccessFile raf = null;
-
try {
-
raf = new RandomAccessFile(file.getFilePath(),"r");
-
} catch (FileNotFoundException e) {
-
e.printStackTrace();
-
}
-
long lLine = 0; //行数
-
while(true){
-
String strLine = null;
-
try {
-
strLine = raf.readLine(); //读取一行
-
if(strLine != null){
-
String strLineData = this.formatCharsetData(strLine, file.getCharsetName()); //进行字符集转化
-
this.readLineNotify.fileReadLineNotify(file.getFileKey(), file.getFilePath(), ++lLine, strLineData); //进行数据通知
-
}else{
-
raf.close();
-
long endTime = System.currentTimeMillis(); //结束时间
-
this.readLineNotify.fileReadLineFinishNotify(file.getFileKey(), file.getFilePath(), lLine, endTime-startTime);
-
this.printLog(file.getFileKey(), file.getFilePath(), lLine,endTime-startTime, this.files.size());
-
break;
-
}
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
-
/**
-
* 打印日志
-
* @param fileKey 文件KEY
-
* @param filePath 文件路径
-
* @param fileLines 文件行数
-
* @param useTime 执行用时
-
* @param noReadFiles 剩余没有读取的文件数
-
*/
-
public void printLog(String fileKey, String filePath, long fileLines, long useTime, int noReadFiles){
-
if(noReadFiles>0){
-
log.info("FileKey: "+fileKey +" 文件路径: " + filePath +" 文件行数:" + String.valueOf(fileLines) + " 读取用时:"+ String.valueOf(useTime)+"ms. [未处理:" +String.valueOf(noReadFiles)+"个]");
-
}else{
-
log.info("FileKey: "+fileKey +" 文件路径: " + filePath +" 文件行数:" + String.valueOf(fileLines) + " 读取用时:"+ String.valueOf(useTime)+"ms. [处理完成]");
-
}
-
}
-
-
/**
-
* 格式化字符集数据
-
* @param data 数据
-
* @param charset 字符集
-
* @return 格式化后的字符串数据
-
*/
-
private String formatCharsetData(String data, String charset){
-
String result = null;
-
byte[] byteData = null;
-
try {
-
byteData = data.getBytes("ISO-8859-1");
-
if(charset !=null && charset.length()>0){
-
result = new String(byteData,charset);
-
}
-
} catch (UnsupportedEncodingException e) {
-
e.printStackTrace();
-
result = data;
-
}
-
-
return result;
-
}
-
-
/**
-
* 读取文件
-
* @param filePath 文件路径
-
* @param charsetName 字符集名称
-
*/
-
public void readFile(String filePath, String charsetName){
-
this.readFile("SystemDefault", filePath, charsetName);
-
}
-
-
/**
-
* 读取文件
-
* @param fileKey 文件KEY
-
* @param filePath 文件路径
-
* @param charsetName 文件的字符集
-
*/
-
public void readFile(String fileKey, String filePath, String charsetName){
-
if(this.isRun()) {
-
ReadFileInfo file = new ReadFileInfo();
-
file.setCharsetName(charsetName);
-
file.setFileKey(fileKey);
-
file.setFilePath(filePath);
-
this.files.add(file);
-
-
synchronized (this.isRun) {
-
this.isRun.notifyAll();
-
}
-
}
-
}
-
-
public void setCloseValidSleepTime(long closeValidSleepTime) {
-
this.closeValidSleepTime = closeValidSleepTime;
-
}
-
-
public void beginCloseLog(){
-
log.info("开始进行,文件读取线程, 关闭操作... ...");
-
}
-
-
public void finishCloseLog(long useTime){
-
log.info("文件读取线程, 关闭完成. [用时:"+String.valueOf(useTime)+"ms]");
-
}
-
-
@Override
-
public void close() {
-
this.closeStartTime = System.currentTimeMillis(); //关闭操作,开始时间
-
this.beginCloseLog();
-
synchronized (this.isRun) {
-
this.isRun.write(false);
-
this.isRun.notifyAll();
-
}
-
-
while(!(this.files.size()==0 && this.isBreakWhile())) {
-
try {
-
Thread.sleep(this.closeValidSleepTime);
-
if(!this.isBreakWhile()) {
-
synchronized (this.isRun) {
-
this.isRun.notifyAll();
-
}
-
}
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
- }