import com.alibaba.fastjson.JSONObject; import com.yiidata.intergration.asynclog.AsyncBucketWriter; import com.yiidata.intergration.asynclog.BucketPath; import com.yiidata.intergration.asynclog.Context; import com.yiidata.intergration.asynclog.FileChangedListener; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; import java.io.File; import java.nio.charset.StandardCharsets; import java.util.Calendar; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TimeZone; import java.util.regex.Matcher; import java.util.regex.Pattern; /** *
 *
 * Created by zhaopx.
 * Date: 2025/5/29
 * Time: 14:39
 * Vendor: exlive.cn
 *
 * 
* * @author zhaopx */ @Slf4j @Component @ConditionalOnProperty(name = "exbooter.access-log.enable", havingValue = "true", matchIfMissing = false) public class SegmentFileWriterStarter implements SmartLifecycle, FileChangedListener { /** * 访问日志记录 */ final AsyncBucketWriter accessLogWriter = new AsyncBucketWriter(); @Autowired AccessLogProperties accessLogProperties; /** * 运行,持续运行,当 dispose 后则置为 false */ boolean running = false; @Override public void start() { try { boolean needRounding = false; TimeZone timeZone = TimeZone.getDefault(); int roundUnit = Calendar.MINUTE; int roundValue = 10; boolean useLocalTime = true; String realPath = BucketPath.escapeString(accessLogProperties.getPath(), new HashMap<>(), timeZone, needRounding, roundUnit, roundValue, useLocalTime); File apiLog = new File(realPath); if(!apiLog.exists()) { apiLog.mkdirs(); log.info("mkdir data root path: {}", apiLog.getAbsolutePath()); } Context context = new Context(); if(SystemUtils.IS_OS_WINDOWS) { context.put("hdfs.path", "file:/" + accessLogProperties.getPath()); } else { context.put("hdfs.path", "file://" + accessLogProperties.getPath()); } // 如果是小粒度的时间间隔,或者是半分间隔间隔,比如 5秒,10秒,2分,则采用 round true 的方式,计算方法为更严格 // 如果是分钟级别,分,小时,每建议采用 round 为false,则 hdfs.filePrefix %Y-%m-%d-%H-%M 的方式 context.put("hdfs.round", "" + accessLogProperties.isRound()); context.put("hdfs.roundUnit", accessLogProperties.getRoundUnit()); context.put("hdfs.roundValue", ""+accessLogProperties.getRoundValue()); // round 为true,10second 配合使用,更严格 context.put("hdfs.useLocalTimeStamp", "true"); context.put("hdfs.writeFormat", "text"); // 消息持久化的方式 context.put("hdfs.fileType", "DataStream"); // context.put("hdfs.fileType", org.apache.commons.lang.StringUtils.equalsIgnoreCase("file", accessLogProperties.getPersistType()) ? // "DataStream" : StringUtils.class.getName()); context.put("hdfs.serializer", "com.yiidata.intergration.asynclog.BodyTextEventSerializer$Builder"); context.put("hdfs.filePrefix", accessLogProperties.getFilePrefix()); context.put("hdfs.fileSuffix", accessLogProperties.getFileSuffix()); // 根据数量滚动,禁止 context.put("hdfs.rollCount", "-1"); // 根据大小滚动,10000000B 大小滚动一次 context.put("hdfs.rollSize", "" + accessLogProperties.getRollSize()); // 根据时间滚动,1分钟一次,禁止 context.put("hdfs.rollInterval", "" + accessLogProperties.getRollInterval()); context.put("hdfs.append", "true"); context.put("hdfs.useRawLocalFileSystem", "true"); context.put("hdfs.maxOpenFiles", "3"); accessLogWriter.configure(context); // 文件完成后通知 accessLogWriter.addFileFinishedListener(this); } catch (Exception e) { log.error("初始化日志记录异常。", e); } // 运行态 running = true; } /** * 写入数据 * {"wifiroot":{},"address":"陕西省 渭南市 富平县 淡村镇东垚附近62米","lng":109.077791,"accuracy":550, * "lbsroot":{"celltowers":[{"mnc":0,"mcc":460,"signalstrength":-100,"cell_id":63563,"lac":37173}]},"lat":34.761421} * @param json */ public void blukData(JSONObject json) { if(json == null || json.isEmpty()) { return; } Double lng = json.getDouble("lng"); Double lat = json.getDouble("lat"); if(lat == null || lng == null) { return; } if(lat <= 0.0 || lng <= 0.0) { return; } // 写入文件,没有 基站,还有 wifi accessLogWriter.write(replaceAllBlank(json.toJSONString()).getBytes(StandardCharsets.UTF_8)); } /** * 写入数据 * @param jsonData */ public void blukData(List jsonData) { if(jsonData == null || jsonData.isEmpty()) { return; } for (JSONObject json : jsonData) { accessLogWriter.write(replaceAllBlank(json.toJSONString()).getBytes(StandardCharsets.UTF_8)); } } /** * 数据文件完成通知 * @param eventHeader */ @Override public void finished(Map eventHeader) { // 写入数据库 id,version,fileName,filePath,fileSize /* eventHeader.put("id", UUID.randomUUID().toString().replaceAll("-", "")); eventHeader.put("version", "2"); eventHeader.put("fileName", dataFile.getName()); eventHeader.put("fileSize", String.valueOf(total)); eventHeader.put("createAt", String.valueOf(System.currentTimeMillis())); */ log.info("one data file: {}", eventHeader); } @Override public void stop() { try { accessLogWriter.close(); } catch (Exception e) { log.error("close accessLogWriter error!", e); } running = false; } @Override public boolean isRunning() { return running; } public static final String EMPTY = ""; /** * 替换所有空格,留下一个 */ private static final String REPLACE_BLANK_ENTER = "\\s{2,}|\t|\r|\n"; private static final Pattern REPLACE_P = Pattern.compile(REPLACE_BLANK_ENTER); /** * 使用正则表达式删除字符串中的空格、回车、换行符、制表符 * @param str * @return */ public static String replaceAllBlank(String str) { String dest = ""; if (StringUtils.isNotBlank(str)) { Matcher m = REPLACE_P.matcher(str); dest = m.replaceAll(""); } return dest; } public static void main(String[] args) { } }