Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
* 其中包含以下三个方法:
* <p>
* getNotProRows:获取集合中未处理的所有行;
* getErrProRows:获取集合中处理失败的所有行;
* getAllDiffRows:获取集合中不同行;
*/
public class FindDiffRows {
class FindDiffRows {
private Logger LOG = Logger.getLogger(FindDiffRows.class);

/**
Expand All @@ -23,9 +22,8 @@ public class FindDiffRows {
* @param allRows 日志合并后的所有行
* @return List对象 未处理数据的集合
*/
public List<String> getNotProRows(List<String> allRows) {
List<String> getNotProRows(List<String> allRows) {
List<String> notProList = new ArrayList<>();
String row;
if (allRows == null || allRows.size() == 0) {
LOG.warn("The unionAllRows size is None");
} else if (allRows.size() == 1) {
Expand Down Expand Up @@ -63,42 +61,17 @@ public List<String> getNotProRows(List<String> allRows) {
notProList.add(allRows.get(allRows.size() - 1));
}
}
Collections.sort(notProList, new ListComparator());
return notProList;
}

/**
* 获取合并后日志中数据处理失败的集合
*
* @param allRows 日志合并后的所有行
* @return List对象 合并后集合中数据处理失败的集合
*/
public List<String> getErrProRows(List<String> allRows) {
List<String> failList = new ArrayList<>();
String tmp;
if (allRows == null || allRows.size() == 0) {
LOG.warn("The unionAllRows size is None");
} else {
Collections.sort(allRows);
for (String allRow : allRows) {
LogEvent event = JSONHelper.toObject(allRow, LogEvent.class);
String processState = event.getStatus();
//根据状态是否为零判断数据是否处理成功
if (!processState.equals("0")) {
failList.add(allRow);
}
}
}
return failList;
}


/**
* 获取集合中不同行
*
* @param allRows 合并后日志集合
* @return List对象 返回合并后不同行的集合
*/
public List<String> getAllDiffRows(List<String> allRows) {
List<String> getAllDiffRows(List<String> allRows) {
List<String> rows = new ArrayList<>();
String row;
if (allRows == null || allRows.size() == 0) {
Expand All @@ -122,7 +95,17 @@ public List<String> getAllDiffRows(List<String> allRows) {
}

}
Collections.sort(rows, new ListComparator());
return rows;
}


private class ListComparator implements Comparator<String> {
@Override
public int compare(String row1, String row2) {
LogEvent event1 = JSONHelper.toObject(row1, LogEvent.class);
LogEvent event2 = JSONHelper.toObject(row2, LogEvent.class);
return Long.compare(event1.getCount(), event2.getCount());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,54 @@

/**
* 恢复未处理的数据(曹大报)
*
* 整体流程:
* 1,根据CommonConf获取process根路径processLogDir;
* 2,遍历process 目录中的所有文件,得到日记文件的一个绝对路径,放入一个List中,processFiles;
* 判断 processFiles 是否为空(process 目录下是否有文件);
* 1) processFiles不为空,遍历获取 processFiles 中每一个元素(process文件的绝对路径),例如:/opt/logdata/process/p-0/000003000000.log;
*
* 根据process文件获取receive文件路径,判断对应receive文件(/opt/logdata/receive/r-0/000003000000.log)是否存在;
* 存在:
* 1,读取process文件和receive文件,合并到一个List 里面(rowsListFactory);
* 2,排序,对比,获取有序的数据集合 notProRows;
* 3,遍历 notProRows 每一条数据,提取特征获取faceObject发送Kafka;
* 根据发送kafka是否成功分别写入不同日志文件中;
* 第一条数据发送kafka失败;
* 结束循环返回false;
* 不存在:
* 把/opt/logdata/process/p-0/000003000000.log 文件移动到success目录下,跳过这层循环。
* 2)processFiles 为空
* 结束循环;
* 3,结束
*/
public class RcoverNotProData {
private Logger LOG = Logger.getLogger(RcoverNotProData.class);
private final SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd");


public boolean recoverNotProData(CommonConf commonConf) {
public boolean RecoverNotProData(CommonConf commonConf) {
FileUtil fileUtil = new FileUtil();
LogEvent logEvent = new LogEvent();
String processLogDir = commonConf.getProcessLogDir();

FileFactory fileFactory = new FileFactory(processLogDir);
List<String> processFiles = fileFactory.getAllProcessFiles();
//标记恢复数据是否成功,默认false
boolean recoverSuccess = false;

//判断process根目录下是否有文件
if (processFiles != null && processFiles.size() != 0) {
for (String processFile : processFiles) {
//获取receive绝对路径
String receiveFile = fileUtil.getRecFileFromProFile(processFile);
//判断对应receive文件是否存在,存在则合并,不存在则删除
//判断对应receive文件是否存在,存在则合并,不存在则移动位置
if (fileUtil.isFileExist(receiveFile)) {
//获取队列ID
String queueID = processFile.substring(processFile.lastIndexOf("-") + 1, processFile.lastIndexOf("/"));
DataProcessLogWriter dataProcessLogWriter = new DataProcessLogWriter(commonConf, queueID);
RowsListFactory rowsListFactory = new RowsListFactory(processFile, receiveFile);
//获取未处理的数据
List<String> notProRows = rowsListFactory.getNotProRows();
//用于标记发送Kafka数据数
long rowCount = 0;
for (int j = 0; j < notProRows.size(); j++) {
String row = notProRows.get(j);
//获取未处理数据的ftpUrl
Expand All @@ -59,34 +78,46 @@ public boolean recoverNotProData(CommonConf commonConf) {
LOG.warn("first data send to Kafka failure");
return false;
} else {
LOG.info("Write log queueID is" + queueID + "" + processFile);
//向对应的processfile中写入日志
//向对应的processFile中写入日志
logEvent.setPath(ftpUrl);
logEvent.setCount(count);
logEvent.setTimeStamp(Long.valueOf(SDF.format(new Date())));
if (success) {
logEvent.setStatus("0");
LOG.info("Send to Kafka success,write log to processFile :" + processFile);
fileUtil.writeMergeFile(logEvent, processFile);
rowCount++;
} else {
//发送Kafka失败
logEvent.setStatus("1");
String errorFilePath = fileUtil.getErrFileFromProFile();
LOG.warn("Send to Kafka failure ,write log to errorLogFile :");
dataProcessLogWriter.errorLogWrite(errorFilePath, logEvent);
rowCount++;
}
logEvent.setCount(count);
logEvent.setTimeStamp(Long.valueOf(SDF.format(new Date())));
dataProcessLogWriter.writeEvent(logEvent);
recoverSuccess = true;
}
}
}
} else {
//对应receive 文件不存在,删除对应文件
boolean deleteFile = fileUtil.deleteFile(processFile);
if (!deleteFile) {
LOG.warn("delete file " + processFile + "failure,please check it!");
if (rowCount == notProRows.size()) {
//处理process文件完成,移动process文件和receive文件到success目录下;
String sucProFilePath = fileUtil.getSuccessFilePath(processFile);
fileUtil.moveFile(processFile, sucProFilePath);
String sucRecFilePath = fileUtil.getSuccessFilePath(receiveFile);
fileUtil.moveFile(receiveFile, sucRecFilePath);
} else {
LOG.warn("send to Kafka data less than NotProRows size, Please check it!");
return false;
}
recoverSuccess = true;
} else {
//对应receive 文件不存在,将process文件移动到success目录下
LOG.info("Can't find receiveFile,move processFile To SuccessDir");
String successFilePath = fileUtil.getSuccessFilePath(processFile);
fileUtil.moveFile(processFile, successFilePath);
}
}
return recoverSuccess;
} else {
LOG.info("The path of " + processLogDir + "is Nothing");
return true;
}
return true;
}
}