From 7593a0ce30235db587f9e1f40250addf4cbdb30c Mon Sep 17 00:00:00 2001
From: cdb156 <15658067087@163.com>
Date: Fri, 2 Feb 2018 15:45:11 +0800
Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E6=94=B9FindDiffRows=EF=BC=8CRcov?=
=?UTF-8?q?erNotProData=E4=BB=A3=E7=A0=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../collect/expand/merge/FindDiffRows.java | 45 ++++++-----------
.../expand/merge/RcoverNotProData.java | 48 ++++++++++++++-----
2 files changed, 49 insertions(+), 44 deletions(-)
diff --git a/FtpServer/src/main/java/com/hzgc/collect/expand/merge/FindDiffRows.java b/FtpServer/src/main/java/com/hzgc/collect/expand/merge/FindDiffRows.java
index 5c9385f6..a9079310 100644
--- a/FtpServer/src/main/java/com/hzgc/collect/expand/merge/FindDiffRows.java
+++ b/FtpServer/src/main/java/com/hzgc/collect/expand/merge/FindDiffRows.java
@@ -11,10 +11,9 @@
* 其中包含以下三个方法:
*
* getNotProRows:获取集合中未处理的所有行;
- * getErrProRows:获取集合中处理失败的所有行;
* getAllDiffRows:获取集合中不同行;
*/
-public class FindDiffRows {
+class FindDiffRows {
private Logger LOG = Logger.getLogger(FindDiffRows.class);
/**
@@ -23,9 +22,8 @@ public class FindDiffRows {
* @param allRows 日志合并后的所有行
* @return List对象 未处理数据的集合
*/
- public List getNotProRows(List allRows) {
+ List getNotProRows(List allRows) {
List notProList = new ArrayList<>();
- String row;
if (allRows == null || allRows.size() == 0) {
LOG.warn("The unionAllRows size is None");
} else if (allRows.size() == 1) {
@@ -63,42 +61,17 @@ public List getNotProRows(List allRows) {
notProList.add(allRows.get(allRows.size() - 1));
}
}
+ Collections.sort(notProList, new ListComparator());
return notProList;
}
- /**
- * 获取合并后日志中数据处理失败的集合
- *
- * @param allRows 日志合并后的所有行
- * @return List对象 合并后集合中数据处理失败的集合
- */
- public List getErrProRows(List allRows) {
- List failList = new ArrayList<>();
- String tmp;
- if (allRows == null || allRows.size() == 0) {
- LOG.warn("The unionAllRows size is None");
- } else {
- Collections.sort(allRows);
- for (String allRow : allRows) {
- LogEvent event = JSONHelper.toObject(allRow, LogEvent.class);
- String processState = event.getStatus();
- //根据状态是否为零判断数据是否处理成功
- if (!processState.equals("0")) {
- failList.add(allRow);
- }
- }
- }
- return failList;
- }
-
-
/**
* 获取集合中不同行
*
* @param allRows 合并后日志集合
* @return List对象 返回合并后不同行的集合
*/
- public List getAllDiffRows(List allRows) {
+ List getAllDiffRows(List allRows) {
List rows = new ArrayList<>();
String row;
if (allRows == null || allRows.size() == 0) {
@@ -122,7 +95,17 @@ public List getAllDiffRows(List allRows) {
}
}
+ Collections.sort(rows, new ListComparator());
return rows;
}
+
+ private class ListComparator implements Comparator {
+ @Override
+ public int compare(String row1, String row2) {
+ LogEvent event1 = JSONHelper.toObject(row1, LogEvent.class);
+ LogEvent event2 = JSONHelper.toObject(row2, LogEvent.class);
+ return Long.compare(event1.getCount(), event2.getCount());
+ }
+ }
}
diff --git a/FtpServer/src/main/java/com/hzgc/collect/expand/merge/RcoverNotProData.java b/FtpServer/src/main/java/com/hzgc/collect/expand/merge/RcoverNotProData.java
index f97bbb9f..44e1fc37 100644
--- a/FtpServer/src/main/java/com/hzgc/collect/expand/merge/RcoverNotProData.java
+++ b/FtpServer/src/main/java/com/hzgc/collect/expand/merge/RcoverNotProData.java
@@ -14,17 +14,36 @@
/**
* 恢复未处理的数据(曹大报)
+ *
+ * * 整体流程:
+ * 1,根据CommonConf获取process根路径processLogDir;
+ * 2,遍历process 目录中的所有文件,得到日记文件的一个绝对路径,放入一个List中,processFiles;
+ * 判断 processFiles 是否为空(process 目录下是否有文件);
+ * 1) processFiles不为空,遍历获取 processFiles 中每一个元素(process文件的绝对路径),例如:/opt/logdata/process/p-0/000003000000.log;
+ *
+ * 根据process文件获取receive文件路径,判断对应receive文件(/opt/logdata/receive/r-0/000003000000.log)是否存在;
+ * 存在:
+ * 1,读取process文件和receive文件,合并到一个List 里面(rowsListFactory);
+ * 2,排序,对比,获取有序的数据集合 notProRows;
+ * 3,遍历 notProRows 每一条数据,提取特征获取faceObject发送Kafka;
+ * 根据发送kafka是否成功分别写入不同日志文件中;
+ * 第一条数据发送kafka失败;
+ * 结束循环返回false;
+ * 不存在:
+ * 把/opt/logdata/process/p-0/000003000000.log 文件移动到success目录下,跳过这层循环。
+ * 2)processFiles 为空
+ * 结束循环;
+ * 3,结束
*/
public class RcoverNotProData {
private Logger LOG = Logger.getLogger(RcoverNotProData.class);
private final SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd");
- public boolean recoverNotProData(CommonConf commonConf) {
+ public boolean RecoverNotProData(CommonConf commonConf) {
FileUtil fileUtil = new FileUtil();
LogEvent logEvent = new LogEvent();
String processLogDir = commonConf.getProcessLogDir();
-
FileFactory fileFactory = new FileFactory(processLogDir);
List processFiles = fileFactory.getAllProcessFiles();
//标记恢复数据是否成功,默认false
@@ -35,7 +54,7 @@ public boolean recoverNotProData(CommonConf commonConf) {
for (String processFile : processFiles) {
//获取receive绝对路径
String receiveFile = fileUtil.getRecFileFromProFile(processFile);
- //判断对应receive文件是否存在,存在则合并,不存在则删除
+ //判断对应receive文件是否存在,存在则合并,不存在则移动位置
if (fileUtil.isFileExist(receiveFile)) {
//获取队列ID
String queueID = processFile.substring(processFile.lastIndexOf("-") + 1, processFile.lastIndexOf("/"));
@@ -59,27 +78,30 @@ public boolean recoverNotProData(CommonConf commonConf) {
LOG.warn("first data send to Kafka failure");
return false;
} else {
- LOG.info("Write log queueID is" + queueID + "" + processFile);
- //向对应的processfile中写入日志
+ //向对应的processFile中写入日志
logEvent.setPath(ftpUrl);
+ logEvent.setCount(count);
+ logEvent.setTimeStamp(Long.valueOf(SDF.format(new Date())));
if (success) {
logEvent.setStatus("0");
+ LOG.info("Send to Kafka success,write log to processFile :" + processFile);
+ fileUtil.writeMergeFile(logEvent, processFile);
} else {
+ //发送Kafka失败
logEvent.setStatus("1");
+ String errorFilePath = fileUtil.getErrFileFromProFile();
+ LOG.warn("Send to Kafka failure ,write log to errorLogFile :");
+ dataProcessLogWriter.errorLogWrite(errorFilePath, logEvent);
}
- logEvent.setCount(count);
- logEvent.setTimeStamp(Long.valueOf(SDF.format(new Date())));
- dataProcessLogWriter.writeEvent(logEvent);
recoverSuccess = true;
}
}
}
} else {
- //对应receive 文件不存在,删除对应文件
- boolean deleteFile = fileUtil.deleteFile(processFile);
- if (!deleteFile) {
- LOG.warn("delete file " + processFile + "failure,please check it!");
- }
+ //对应receive 文件不存在,将process文件移动到success目录下
+ LOG.info("Can't find receiveFile,move processFile To SuccessDir");
+ String successFilePath = fileUtil.getSuccessFilePath(processFile);
+ fileUtil.moveFile(processFile, successFilePath);
recoverSuccess = true;
}
}
From ea11a0ab5fd80cc19d7edf9f875221500e7d3525 Mon Sep 17 00:00:00 2001
From: cdb156 <15658067087@163.com>
Date: Mon, 5 Feb 2018 16:15:51 +0800
Subject: [PATCH 2/2] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=8F=91=E9=80=81kafka?=
=?UTF-8?q?=E5=90=8E=E7=A7=BB=E5=8A=A8=E6=96=87=E4=BB=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../expand/merge/RcoverNotProData.java | 23 +++++++++++++------
1 file changed, 16 insertions(+), 7 deletions(-)
diff --git a/FtpServer/src/main/java/com/hzgc/collect/expand/merge/RcoverNotProData.java b/FtpServer/src/main/java/com/hzgc/collect/expand/merge/RcoverNotProData.java
index 44e1fc37..69c691bf 100644
--- a/FtpServer/src/main/java/com/hzgc/collect/expand/merge/RcoverNotProData.java
+++ b/FtpServer/src/main/java/com/hzgc/collect/expand/merge/RcoverNotProData.java
@@ -15,7 +15,7 @@
/**
* 恢复未处理的数据(曹大报)
*
- * * 整体流程:
+ * 整体流程:
* 1,根据CommonConf获取process根路径processLogDir;
* 2,遍历process 目录中的所有文件,得到日记文件的一个绝对路径,放入一个List中,processFiles;
* 判断 processFiles 是否为空(process 目录下是否有文件);
@@ -46,8 +46,6 @@ public boolean RecoverNotProData(CommonConf commonConf) {
String processLogDir = commonConf.getProcessLogDir();
FileFactory fileFactory = new FileFactory(processLogDir);
List processFiles = fileFactory.getAllProcessFiles();
- //标记恢复数据是否成功,默认false
- boolean recoverSuccess = false;
//判断process根目录下是否有文件
if (processFiles != null && processFiles.size() != 0) {
@@ -62,6 +60,8 @@ public boolean RecoverNotProData(CommonConf commonConf) {
RowsListFactory rowsListFactory = new RowsListFactory(processFile, receiveFile);
//获取未处理的数据
List notProRows = rowsListFactory.getNotProRows();
+ //用于标记发送Kafka数据数
+ long rowCount = 0;
for (int j = 0; j < notProRows.size(); j++) {
String row = notProRows.get(j);
//获取未处理数据的ftpUrl
@@ -86,29 +86,38 @@ public boolean RecoverNotProData(CommonConf commonConf) {
logEvent.setStatus("0");
LOG.info("Send to Kafka success,write log to processFile :" + processFile);
fileUtil.writeMergeFile(logEvent, processFile);
+ rowCount++;
} else {
//发送Kafka失败
logEvent.setStatus("1");
String errorFilePath = fileUtil.getErrFileFromProFile();
LOG.warn("Send to Kafka failure ,write log to errorLogFile :");
dataProcessLogWriter.errorLogWrite(errorFilePath, logEvent);
+ rowCount++;
}
- recoverSuccess = true;
}
}
}
+ if (rowCount == notProRows.size()) {
+ //处理process文件完成,移动process文件和receive文件到success目录下;
+ String sucProFilePath = fileUtil.getSuccessFilePath(processFile);
+ fileUtil.moveFile(processFile, sucProFilePath);
+ String sucRecFilePath = fileUtil.getSuccessFilePath(receiveFile);
+ fileUtil.moveFile(receiveFile, sucRecFilePath);
+ } else {
+ LOG.warn("send to Kafka data less than NotProRows size, Please check it!");
+ return false;
+ }
} else {
//对应receive 文件不存在,将process文件移动到success目录下
LOG.info("Can't find receiveFile,move processFile To SuccessDir");
String successFilePath = fileUtil.getSuccessFilePath(processFile);
fileUtil.moveFile(processFile, successFilePath);
- recoverSuccess = true;
}
}
- return recoverSuccess;
} else {
LOG.info("The path of " + processLogDir + "is Nothing");
- return true;
}
+ return true;
}
}