Selaa lähdekoodia

java sengment file 写入

zhaopingxi 5 kuukautta sitten
vanhempi
commit
468f4e252f

+ 172 - 0
java-segment-file/AccessLogProperties.java

@@ -0,0 +1,172 @@
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * <pre>
+ *
+ * Created by zhaopx.
+ * Date: 2025/5/29
+ * Time: 14:50
+ * Vendor: exlive.cn
+ *
+ * </pre>
+ *
+ * @author zhaopx
+ */
+@Setter
+@Getter
+@Configuration
+@ConfigurationProperties(prefix = "exbooter.access-log")
+public class AccessLogProperties {
+
+    /**
+     * 文件,还是消息。file/jdbc/rabbitmq
+     */
+    String persistType = "jdbc";
+
+
+    /**
+     * 文件相关配置
+     */
+    Map<String, Object> file;
+
+
+    /**
+     * 数据库相关配置
+     */
+    Map<String, Object> jdbc;
+
+
+
+    Map<String, Object> rabbitmq;
+
+
+    Map<String, Object> kafka;
+
+
+
+    public String getPersistType() {
+        return Optional.ofNullable(StringUtils.trimToNull(persistType)).orElse("jdbc");
+    }
+
+    public String getPath() {
+        return Optional.ofNullable(StringUtils.trimToNull((String) getFile().getOrDefault("path", "./logs"))).orElse("./logs");
+    }
+
+    public Map<String, Object> getFile() {
+        if(file == null) {
+            file = new HashMap<>();
+        }
+        return file;
+    }
+
+    public String getFilePrefix() {
+        return Optional.ofNullable(StringUtils.trimToNull((String) getFile().getOrDefault("filePrefix", "api-access."))).orElse("api-access.");
+    }
+
+
+    public String getFileSuffix() {
+        return Optional.ofNullable(StringUtils.trimToNull((String) getFile().getOrDefault("fileSuffix", ".log"))).orElse(".log");
+    }
+
+    /**
+     * 开启 round
+     * @return
+     */
+    public boolean isRound() {
+        return Optional.ofNullable((Boolean) getFile().getOrDefault("round", false)).orElse(false);
+    }
+
+    /**
+     * 滚单位。second,minute,hour
+     */
+    public String getRoundUnit() {
+        return Optional.ofNullable(StringUtils.trimToNull((String) getFile().getOrDefault("roundUnit", "minute"))).orElse("minute");
+    }
+
+    /**
+     * 滚动值,默认 1
+     */
+    public int getRoundValue() {
+        return Optional.ofNullable((Integer) getFile().getOrDefault("roundValue", 1)).orElse(1);
+    }
+
+    /**
+     * 滚动大小,10M
+     */
+    public int getRollSize() {
+        return Optional.ofNullable((Integer) getFile().getOrDefault("rollSize", 10 * 1024 * 1024)).orElse(10 * 1024 * 1024);
+    }
+
+    /**
+     * 滚动时间,60s
+     */
+    public int getRollInterval() {
+        return Optional.ofNullable((Integer) getFile().getOrDefault("rollInterval", 60)).orElse(60);
+    }
+
+    public Map<String, Object> getJdbc() {
+        if(jdbc == null) {
+            jdbc = new HashMap<>();
+        }
+        return jdbc;
+    }
+
+    public Map<String, Object> getRabbitmq() {
+        if(rabbitmq == null) {
+            rabbitmq = new HashMap<>();
+        }
+        return rabbitmq;
+    }
+
+    public Map<String, Object> getKafka() {
+        if(kafka == null) {
+            kafka = new HashMap<>();
+        }
+        return kafka;
+    }
+
+
+    /**
+     * RabbitMQ 相关配置
+     */
+    public String getHost() {
+        return Optional.ofNullable(StringUtils.trimToNull((String) getRabbitmq().getOrDefault("localhost", "localhost"))).orElse("localhost");
+    }
+
+    public int getPort() {
+        return Optional.ofNullable((Integer) getRabbitmq().getOrDefault("port", 5217)).orElse(5217);
+    }
+
+    public String getUsername() {
+        return StringUtils.trimToNull((String) getRabbitmq().get("username"));
+    }
+
+    public String getPassword() {
+        return StringUtils.trimToNull((String) getRabbitmq().get("password"));
+    }
+
+
+    public String getVirtualHost() {
+        return Optional.ofNullable(StringUtils.trimToNull((String) getRabbitmq().getOrDefault("virtualHost", "/"))).orElse("/");
+    }
+
+    public String getExchangeName() {
+        return StringUtils.trimToNull((String) getRabbitmq().get("exchangeName"));
+    }
+
+    public String getRoutingKey() {
+        return StringUtils.trimToNull((String) getRabbitmq().get("routingKey"));
+    }
+
+    public boolean getAutoDelete() {
+        return Optional.ofNullable((Boolean) getRabbitmq().getOrDefault("autoDelete", false)).orElse(false);
+    }
+}

+ 202 - 0
java-segment-file/SegmentFileWriterStarter.java

@@ -0,0 +1,202 @@
+import com.alibaba.fastjson.JSONObject;
+import com.yiidata.intergration.asynclog.AsyncBucketWriter;
+import com.yiidata.intergration.asynclog.BucketPath;
+import com.yiidata.intergration.asynclog.Context;
+import com.yiidata.intergration.asynclog.FileChangedListener;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.SmartLifecycle;
+import org.springframework.stereotype.Component;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * <pre>
+ *
+ * Created by zhaopx.
+ * Date: 2025/5/29
+ * Time: 14:39
+ * Vendor: exlive.cn
+ *
+ * </pre>
+ *
+ * @author zhaopx
+ */
+@Slf4j
+@Component
+@ConditionalOnProperty(name = "exbooter.access-log.enable", havingValue = "true", matchIfMissing = false)
+public class SegmentFileWriterStarter implements SmartLifecycle, FileChangedListener {
+
+    /**
+     * 访问日志记录
+     */
+    final AsyncBucketWriter accessLogWriter = new AsyncBucketWriter();
+
+    @Autowired
+    AccessLogProperties accessLogProperties;
+
+    /**
+     * 运行,持续运行,当 dispose 后则置为 false
+     */
+    boolean running = false;
+
+    @Override
+    public void start() {
+        try {
+            boolean needRounding = false;
+            TimeZone timeZone = TimeZone.getDefault();
+            int roundUnit = Calendar.MINUTE;
+            int roundValue = 10;
+            boolean useLocalTime = true;
+            String realPath = BucketPath.escapeString(accessLogProperties.getPath(), new HashMap<>(),
+                    timeZone, needRounding, roundUnit, roundValue, useLocalTime);
+            File apiLog = new File(realPath);
+            if(!apiLog.exists()) {
+                apiLog.mkdirs();
+                log.info("mkdir data root path: {}", apiLog.getAbsolutePath());
+            }
+            Context context = new Context();
+            if(SystemUtils.IS_OS_WINDOWS) {
+                context.put("hdfs.path", "file:/" + accessLogProperties.getPath());
+            } else {
+                context.put("hdfs.path", "file://" + accessLogProperties.getPath());
+            }
+            // 如果是小粒度的时间间隔,或者是半分间隔间隔,比如 5秒,10秒,2分,则采用 round true 的方式,计算方法为更严格
+            // 如果是分钟级别,分,小时,每建议采用 round 为false,则 hdfs.filePrefix %Y-%m-%d-%H-%M 的方式
+            context.put("hdfs.round", "" + accessLogProperties.isRound());
+            context.put("hdfs.roundUnit", accessLogProperties.getRoundUnit());
+            context.put("hdfs.roundValue", ""+accessLogProperties.getRoundValue()); // round 为true,10second 配合使用,更严格
+            context.put("hdfs.useLocalTimeStamp", "true");
+
+            context.put("hdfs.writeFormat", "text");
+            // 消息持久化的方式
+            context.put("hdfs.fileType", "DataStream");
+            // context.put("hdfs.fileType", org.apache.commons.lang.StringUtils.equalsIgnoreCase("file", accessLogProperties.getPersistType()) ?
+            //        "DataStream" : StringUtils.class.getName());
+            context.put("hdfs.serializer", "com.yiidata.intergration.asynclog.BodyTextEventSerializer$Builder");
+            context.put("hdfs.filePrefix", accessLogProperties.getFilePrefix());
+            context.put("hdfs.fileSuffix", accessLogProperties.getFileSuffix());
+            // 根据数量滚动,禁止
+            context.put("hdfs.rollCount", "-1");
+            // 根据大小滚动,10000000B 大小滚动一次
+            context.put("hdfs.rollSize", "" + accessLogProperties.getRollSize());
+            // 根据时间滚动,1分钟一次,禁止
+            context.put("hdfs.rollInterval", "" + accessLogProperties.getRollInterval());
+            context.put("hdfs.append", "true");
+            context.put("hdfs.useRawLocalFileSystem", "true");
+            context.put("hdfs.maxOpenFiles", "3");
+            accessLogWriter.configure(context);
+            // 文件完成后通知
+            accessLogWriter.addFileFinishedListener(this);
+        } catch (Exception e) {
+            log.error("初始化日志记录异常。", e);
+        }
+        // 运行态
+        running = true;
+    }
+
+    /**
+     * 写入数据
+     * {"wifiroot":{},"address":"陕西省 渭南市 富平县 淡村镇东垚附近62米","lng":109.077791,"accuracy":550,
+     * "lbsroot":{"celltowers":[{"mnc":0,"mcc":460,"signalstrength":-100,"cell_id":63563,"lac":37173}]},"lat":34.761421}
+     * @param json
+     */
+    public void blukData(JSONObject json) {
+        if(json == null || json.isEmpty()) {
+            return;
+        }
+
+        Double lng = json.getDouble("lng");
+        Double lat = json.getDouble("lat");
+        if(lat == null || lng == null) {
+            return;
+        }
+        if(lat <= 0.0 || lng <= 0.0) {
+            return;
+        }
+        // 写入文件,没有 基站,还有 wifi
+        accessLogWriter.write(replaceAllBlank(json.toJSONString()).getBytes(StandardCharsets.UTF_8));
+    }
+
+    /**
+     * 写入数据
+     * @param jsonData
+     */
+    public void blukData(List<JSONObject> jsonData) {
+        if(jsonData == null || jsonData.isEmpty()) {
+            return;
+        }
+        for (JSONObject json : jsonData) {
+            accessLogWriter.write(replaceAllBlank(json.toJSONString()).getBytes(StandardCharsets.UTF_8));
+        }
+    }
+
+    /**
+     * 数据文件完成通知
+     * @param eventHeader
+     */
+    @Override
+    public void finished(Map<String, String> eventHeader) {
+        // 写入数据库 id,version,fileName,filePath,fileSize
+        /*
+        eventHeader.put("id", UUID.randomUUID().toString().replaceAll("-", ""));
+        eventHeader.put("version", "2");
+        eventHeader.put("fileName", dataFile.getName());
+        eventHeader.put("fileSize", String.valueOf(total));
+        eventHeader.put("createAt", String.valueOf(System.currentTimeMillis()));
+         */
+        log.info("one data file: {}", eventHeader);
+    }
+
+    @Override
+    public void stop() {
+        try {
+            accessLogWriter.close();
+        } catch (Exception e) {
+            log.error("close accessLogWriter error!", e);
+        }
+        running = false;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running;
+    }
+
+    public static final String EMPTY = "";
+
+    /**
+     * 替换所有空格,留下一个
+     */
+    private static final String REPLACE_BLANK_ENTER = "\\s{2,}|\t|\r|\n";
+    private static final Pattern REPLACE_P = Pattern.compile(REPLACE_BLANK_ENTER);
+
+    /**
+     * 使用正则表达式删除字符串中的空格、回车、换行符、制表符
+     * @param str
+     * @return
+     */
+    public static String replaceAllBlank(String str) {
+        String dest = "";
+        if (StringUtils.isNotBlank(str)) {
+            Matcher m = REPLACE_P.matcher(str);
+            dest = m.replaceAll("");
+        }
+        return dest;
+    }
+
+    public static void main(String[] args) {
+
+    }
+}