| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- import com.alibaba.fastjson.JSONObject;
- import com.yiidata.intergration.asynclog.AsyncBucketWriter;
- import com.yiidata.intergration.asynclog.BucketPath;
- import com.yiidata.intergration.asynclog.Context;
- import com.yiidata.intergration.asynclog.FileChangedListener;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.commons.lang3.SystemUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.context.SmartLifecycle;
- import org.springframework.stereotype.Component;
- import java.io.File;
- import java.nio.charset.StandardCharsets;
- import java.util.Calendar;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.TimeZone;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
- /**
- * <pre>
- *
- * Created by zhaopx.
- * Date: 2025/5/29
- * Time: 14:39
- * Vendor: exlive.cn
- *
- * </pre>
- *
- * @author zhaopx
- */
- @Slf4j
- @Component
- @ConditionalOnProperty(name = "exbooter.access-log.enable", havingValue = "true", matchIfMissing = false)
- public class SegmentFileWriterStarter implements SmartLifecycle, FileChangedListener {
- /**
- * 访问日志记录
- */
- final AsyncBucketWriter accessLogWriter = new AsyncBucketWriter();
- @Autowired
- AccessLogProperties accessLogProperties;
- /**
- * 运行,持续运行,当 dispose 后则置为 false
- */
- boolean running = false;
- @Override
- public void start() {
- try {
- boolean needRounding = false;
- TimeZone timeZone = TimeZone.getDefault();
- int roundUnit = Calendar.MINUTE;
- int roundValue = 10;
- boolean useLocalTime = true;
- String realPath = BucketPath.escapeString(accessLogProperties.getPath(), new HashMap<>(),
- timeZone, needRounding, roundUnit, roundValue, useLocalTime);
- File apiLog = new File(realPath);
- if(!apiLog.exists()) {
- apiLog.mkdirs();
- log.info("mkdir data root path: {}", apiLog.getAbsolutePath());
- }
- Context context = new Context();
- if(SystemUtils.IS_OS_WINDOWS) {
- context.put("hdfs.path", "file:/" + accessLogProperties.getPath());
- } else {
- context.put("hdfs.path", "file://" + accessLogProperties.getPath());
- }
- // 如果是小粒度的时间间隔,或者是半分间隔间隔,比如 5秒,10秒,2分,则采用 round true 的方式,计算方法为更严格
- // 如果是分钟级别,分,小时,每建议采用 round 为false,则 hdfs.filePrefix %Y-%m-%d-%H-%M 的方式
- context.put("hdfs.round", "" + accessLogProperties.isRound());
- context.put("hdfs.roundUnit", accessLogProperties.getRoundUnit());
- context.put("hdfs.roundValue", ""+accessLogProperties.getRoundValue()); // round 为true,10second 配合使用,更严格
- context.put("hdfs.useLocalTimeStamp", "true");
- context.put("hdfs.writeFormat", "text");
- // 消息持久化的方式
- context.put("hdfs.fileType", "DataStream");
- // context.put("hdfs.fileType", org.apache.commons.lang.StringUtils.equalsIgnoreCase("file", accessLogProperties.getPersistType()) ?
- // "DataStream" : StringUtils.class.getName());
- context.put("hdfs.serializer", "com.yiidata.intergration.asynclog.BodyTextEventSerializer$Builder");
- context.put("hdfs.filePrefix", accessLogProperties.getFilePrefix());
- context.put("hdfs.fileSuffix", accessLogProperties.getFileSuffix());
- // 根据数量滚动,禁止
- context.put("hdfs.rollCount", "-1");
- // 根据大小滚动,10000000B 大小滚动一次
- context.put("hdfs.rollSize", "" + accessLogProperties.getRollSize());
- // 根据时间滚动,1分钟一次,禁止
- context.put("hdfs.rollInterval", "" + accessLogProperties.getRollInterval());
- context.put("hdfs.append", "true");
- context.put("hdfs.useRawLocalFileSystem", "true");
- context.put("hdfs.maxOpenFiles", "3");
- accessLogWriter.configure(context);
- // 文件完成后通知
- accessLogWriter.addFileFinishedListener(this);
- } catch (Exception e) {
- log.error("初始化日志记录异常。", e);
- }
- // 运行态
- running = true;
- }
- /**
- * 写入数据
- * {"wifiroot":{},"address":"陕西省 渭南市 富平县 淡村镇东垚附近62米","lng":109.077791,"accuracy":550,
- * "lbsroot":{"celltowers":[{"mnc":0,"mcc":460,"signalstrength":-100,"cell_id":63563,"lac":37173}]},"lat":34.761421}
- * @param json
- */
- public void blukData(JSONObject json) {
- if(json == null || json.isEmpty()) {
- return;
- }
- Double lng = json.getDouble("lng");
- Double lat = json.getDouble("lat");
- if(lat == null || lng == null) {
- return;
- }
- if(lat <= 0.0 || lng <= 0.0) {
- return;
- }
- // 写入文件,没有 基站,还有 wifi
- accessLogWriter.write(replaceAllBlank(json.toJSONString()).getBytes(StandardCharsets.UTF_8));
- }
- /**
- * 写入数据
- * @param jsonData
- */
- public void blukData(List<JSONObject> jsonData) {
- if(jsonData == null || jsonData.isEmpty()) {
- return;
- }
- for (JSONObject json : jsonData) {
- accessLogWriter.write(replaceAllBlank(json.toJSONString()).getBytes(StandardCharsets.UTF_8));
- }
- }
- /**
- * 数据文件完成通知
- * @param eventHeader
- */
- @Override
- public void finished(Map<String, String> eventHeader) {
- // 写入数据库 id,version,fileName,filePath,fileSize
- /*
- eventHeader.put("id", UUID.randomUUID().toString().replaceAll("-", ""));
- eventHeader.put("version", "2");
- eventHeader.put("fileName", dataFile.getName());
- eventHeader.put("fileSize", String.valueOf(total));
- eventHeader.put("createAt", String.valueOf(System.currentTimeMillis()));
- */
- log.info("one data file: {}", eventHeader);
- }
- @Override
- public void stop() {
- try {
- accessLogWriter.close();
- } catch (Exception e) {
- log.error("close accessLogWriter error!", e);
- }
- running = false;
- }
- @Override
- public boolean isRunning() {
- return running;
- }
- public static final String EMPTY = "";
- /**
- * 替换所有空格,留下一个
- */
- private static final String REPLACE_BLANK_ENTER = "\\s{2,}|\t|\r|\n";
- private static final Pattern REPLACE_P = Pattern.compile(REPLACE_BLANK_ENTER);
- /**
- * 使用正则表达式删除字符串中的空格、回车、换行符、制表符
- * @param str
- * @return
- */
- public static String replaceAllBlank(String str) {
- String dest = "";
- if (StringUtils.isNotBlank(str)) {
- Matcher m = REPLACE_P.matcher(str);
- dest = m.replaceAll("");
- }
- return dest;
- }
- public static void main(String[] args) {
- }
- }
|