SegmentFileWriterStarter.java 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. import com.alibaba.fastjson.JSONObject;
  2. import com.yiidata.intergration.asynclog.AsyncBucketWriter;
  3. import com.yiidata.intergration.asynclog.BucketPath;
  4. import com.yiidata.intergration.asynclog.Context;
  5. import com.yiidata.intergration.asynclog.FileChangedListener;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.commons.lang3.StringUtils;
  8. import org.apache.commons.lang3.SystemUtils;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  11. import org.springframework.context.SmartLifecycle;
  12. import org.springframework.stereotype.Component;
  13. import java.io.File;
  14. import java.nio.charset.StandardCharsets;
  15. import java.util.Calendar;
  16. import java.util.HashMap;
  17. import java.util.List;
  18. import java.util.Map;
  19. import java.util.TimeZone;
  20. import java.util.regex.Matcher;
  21. import java.util.regex.Pattern;
  22. /**
  23. * <pre>
  24. *
  25. * Created by zhaopx.
  26. * Date: 2025/5/29
  27. * Time: 14:39
  28. * Vendor: exlive.cn
  29. *
  30. * </pre>
  31. *
  32. * @author zhaopx
  33. */
  34. @Slf4j
  35. @Component
  36. @ConditionalOnProperty(name = "exbooter.access-log.enable", havingValue = "true", matchIfMissing = false)
  37. public class SegmentFileWriterStarter implements SmartLifecycle, FileChangedListener {
  38. /**
  39. * 访问日志记录
  40. */
  41. final AsyncBucketWriter accessLogWriter = new AsyncBucketWriter();
  42. @Autowired
  43. AccessLogProperties accessLogProperties;
  44. /**
  45. * 运行,持续运行,当 dispose 后则置为 false
  46. */
  47. boolean running = false;
  48. @Override
  49. public void start() {
  50. try {
  51. boolean needRounding = false;
  52. TimeZone timeZone = TimeZone.getDefault();
  53. int roundUnit = Calendar.MINUTE;
  54. int roundValue = 10;
  55. boolean useLocalTime = true;
  56. String realPath = BucketPath.escapeString(accessLogProperties.getPath(), new HashMap<>(),
  57. timeZone, needRounding, roundUnit, roundValue, useLocalTime);
  58. File apiLog = new File(realPath);
  59. if(!apiLog.exists()) {
  60. apiLog.mkdirs();
  61. log.info("mkdir data root path: {}", apiLog.getAbsolutePath());
  62. }
  63. Context context = new Context();
  64. if(SystemUtils.IS_OS_WINDOWS) {
  65. context.put("hdfs.path", "file:/" + accessLogProperties.getPath());
  66. } else {
  67. context.put("hdfs.path", "file://" + accessLogProperties.getPath());
  68. }
  69. // 如果是小粒度的时间间隔,或者是半分间隔间隔,比如 5秒,10秒,2分,则采用 round true 的方式,计算方法为更严格
  70. // 如果是分钟级别,分,小时,每建议采用 round 为false,则 hdfs.filePrefix %Y-%m-%d-%H-%M 的方式
  71. context.put("hdfs.round", "" + accessLogProperties.isRound());
  72. context.put("hdfs.roundUnit", accessLogProperties.getRoundUnit());
  73. context.put("hdfs.roundValue", ""+accessLogProperties.getRoundValue()); // round 为true,10second 配合使用,更严格
  74. context.put("hdfs.useLocalTimeStamp", "true");
  75. context.put("hdfs.writeFormat", "text");
  76. // 消息持久化的方式
  77. context.put("hdfs.fileType", "DataStream");
  78. // context.put("hdfs.fileType", org.apache.commons.lang.StringUtils.equalsIgnoreCase("file", accessLogProperties.getPersistType()) ?
  79. // "DataStream" : StringUtils.class.getName());
  80. context.put("hdfs.serializer", "com.yiidata.intergration.asynclog.BodyTextEventSerializer$Builder");
  81. context.put("hdfs.filePrefix", accessLogProperties.getFilePrefix());
  82. context.put("hdfs.fileSuffix", accessLogProperties.getFileSuffix());
  83. // 根据数量滚动,禁止
  84. context.put("hdfs.rollCount", "-1");
  85. // 根据大小滚动,10000000B 大小滚动一次
  86. context.put("hdfs.rollSize", "" + accessLogProperties.getRollSize());
  87. // 根据时间滚动,1分钟一次,禁止
  88. context.put("hdfs.rollInterval", "" + accessLogProperties.getRollInterval());
  89. context.put("hdfs.append", "true");
  90. context.put("hdfs.useRawLocalFileSystem", "true");
  91. context.put("hdfs.maxOpenFiles", "3");
  92. accessLogWriter.configure(context);
  93. // 文件完成后通知
  94. accessLogWriter.addFileFinishedListener(this);
  95. } catch (Exception e) {
  96. log.error("初始化日志记录异常。", e);
  97. }
  98. // 运行态
  99. running = true;
  100. }
  101. /**
  102. * 写入数据
  103. * {"wifiroot":{},"address":"陕西省 渭南市 富平县 淡村镇东垚附近62米","lng":109.077791,"accuracy":550,
  104. * "lbsroot":{"celltowers":[{"mnc":0,"mcc":460,"signalstrength":-100,"cell_id":63563,"lac":37173}]},"lat":34.761421}
  105. * @param json
  106. */
  107. public void blukData(JSONObject json) {
  108. if(json == null || json.isEmpty()) {
  109. return;
  110. }
  111. Double lng = json.getDouble("lng");
  112. Double lat = json.getDouble("lat");
  113. if(lat == null || lng == null) {
  114. return;
  115. }
  116. if(lat <= 0.0 || lng <= 0.0) {
  117. return;
  118. }
  119. // 写入文件,没有 基站,还有 wifi
  120. accessLogWriter.write(replaceAllBlank(json.toJSONString()).getBytes(StandardCharsets.UTF_8));
  121. }
  122. /**
  123. * 写入数据
  124. * @param jsonData
  125. */
  126. public void blukData(List<JSONObject> jsonData) {
  127. if(jsonData == null || jsonData.isEmpty()) {
  128. return;
  129. }
  130. for (JSONObject json : jsonData) {
  131. accessLogWriter.write(replaceAllBlank(json.toJSONString()).getBytes(StandardCharsets.UTF_8));
  132. }
  133. }
  134. /**
  135. * 数据文件完成通知
  136. * @param eventHeader
  137. */
  138. @Override
  139. public void finished(Map<String, String> eventHeader) {
  140. // 写入数据库 id,version,fileName,filePath,fileSize
  141. /*
  142. eventHeader.put("id", UUID.randomUUID().toString().replaceAll("-", ""));
  143. eventHeader.put("version", "2");
  144. eventHeader.put("fileName", dataFile.getName());
  145. eventHeader.put("fileSize", String.valueOf(total));
  146. eventHeader.put("createAt", String.valueOf(System.currentTimeMillis()));
  147. */
  148. log.info("one data file: {}", eventHeader);
  149. }
  150. @Override
  151. public void stop() {
  152. try {
  153. accessLogWriter.close();
  154. } catch (Exception e) {
  155. log.error("close accessLogWriter error!", e);
  156. }
  157. running = false;
  158. }
  159. @Override
  160. public boolean isRunning() {
  161. return running;
  162. }
  163. public static final String EMPTY = "";
  164. /**
  165. * 替换所有空格,留下一个
  166. */
  167. private static final String REPLACE_BLANK_ENTER = "\\s{2,}|\t|\r|\n";
  168. private static final Pattern REPLACE_P = Pattern.compile(REPLACE_BLANK_ENTER);
  169. /**
  170. * 使用正则表达式删除字符串中的空格、回车、换行符、制表符
  171. * @param str
  172. * @return
  173. */
  174. public static String replaceAllBlank(String str) {
  175. String dest = "";
  176. if (StringUtils.isNotBlank(str)) {
  177. Matcher m = REPLACE_P.matcher(str);
  178. dest = m.replaceAll("");
  179. }
  180. return dest;
  181. }
  182. public static void main(String[] args) {
  183. }
  184. }