Selaa lähdekoodia

!2 支持windows屏幕采集、支持全局超时、无人拉流持续时长自动关闭等配置以及转封装失败自动切换到转码模式
Merge pull request !2 from eguid/master

MisterZhang 4 vuotta sitten
vanhempi
commit
885b493ffb

+ 1 - 1
src/main/java/com/zj/controller/StreamController.java

@@ -65,7 +65,7 @@ public class StreamController {
 			MediaRecodeOrTransfer mediaConvert = MediaService.cameras.get(digestHex);
 //			MediaConvert mediaConvert = MediaService.cameras.get(digestHex);
 			if(mediaConvert != null) {
-				camera.setStatus(mediaConvert.isRuning());
+				camera.setStatus(mediaConvert.isRunning());
 			} else {
 				camera.setStatus(false);
 			}

+ 35 - 11
src/main/java/com/zj/service/MediaService.java

@@ -2,6 +2,7 @@ package com.zj.service;
 
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import com.zj.entity.Camera;
@@ -13,9 +14,10 @@ import cn.hutool.crypto.digest.MD5;
 import io.netty.channel.ChannelHandlerContext;
 
 /**
- * 媒体服务
+ * 媒体服务,支持全局网络超时、读写超时、无人拉流持续时长自动关闭流等配置
  * 
  * @author ZJ
+ * @author eguid
  *
  */
 @Service
@@ -25,10 +27,34 @@ public class MediaService {
 	public static ConcurrentHashMap<String, MediaRecodeOrTransfer> cameras = new ConcurrentHashMap<>();
 
 	/**
-	 * 
-	 * @param url 源地址
+	 * 网络超时
 	 */
-	public void playForHttp(Camera camera, ChannelHandlerContext ctx, boolean autoClose) {
+	@Value("${mediaserver.netTimeout}")
+	private String netTimeout = "15000000";
+	/**
+	 * 读写超时
+	 */
+	@Value("${mediaserver.readOrWriteTimeout}")
+	private String readOrWriteTimeout = "15000000";
+
+	/**
+	 * 无人拉流观看是否自动关闭流
+	 */
+	@Value("${mediaserver.autoClose}")
+	private boolean autoClose=true;
+
+	/**
+	 * 无人拉流观看持续多久自动关闭,1分钟
+	 */
+	@Value("${mediaserver.autoClose.noClientsDuration}")
+	private long noClientsDuration=60000;
+	/**
+	 *http-flv播放
+	 * @param camera
+	 * @param ctx
+	 * @param isAutoClose
+	 */
+	public void playForHttp(Camera camera, ChannelHandlerContext ctx, Boolean isAutoClose) {
 
 		// 区分不同媒体
 		String mediaKey = MD5.create().digestHex(camera.getUrl());
@@ -38,14 +64,14 @@ public class MediaService {
 			cameras.put(mediaKey, mediaConvert);
 			mediaConvert.addHttpClient(ctx);
 		} else {
-			MediaRecodeOrTransfer mediaConvert = new MediaRecodeOrTransfer(camera, autoClose);
+			MediaRecodeOrTransfer mediaConvert = new MediaRecodeOrTransfer(camera, isAutoClose==null?autoClose:isAutoClose,noClientsDuration, netTimeout,readOrWriteTimeout);
 			cameras.put(mediaKey, mediaConvert);
 			ThreadUtil.execute(mediaConvert);
 			mediaConvert.addHttpClient(ctx);
 		}
 	}
 
-	public void playForWs(Camera camera, ChannelHandlerContext ctx, boolean autoClose) {
+	public void playForWs(Camera camera, ChannelHandlerContext ctx, Boolean isAutoClose) {
 
 		// 区分不同媒体
 		String mediaKey = MD5.create().digestHex(camera.getUrl());
@@ -55,7 +81,7 @@ public class MediaService {
 			cameras.put(mediaKey, mediaConvert);
 			mediaConvert.addWsClient(ctx);
 		} else {
-			MediaRecodeOrTransfer mediaConvert = new MediaRecodeOrTransfer(camera, autoClose);
+			MediaRecodeOrTransfer mediaConvert = new MediaRecodeOrTransfer(camera, isAutoClose==null?autoClose:isAutoClose,noClientsDuration, netTimeout,readOrWriteTimeout);
 			cameras.put(mediaKey, mediaConvert);
 			ThreadUtil.execute(mediaConvert);
 			mediaConvert.addWsClient(ctx);
@@ -72,7 +98,7 @@ public class MediaService {
 		String mediaKey = MD5.create().digestHex(camera.getUrl());
 
 		if (!cameras.containsKey(mediaKey)) {
-			MediaRecodeOrTransfer mediaConvert = new MediaRecodeOrTransfer(camera, false);
+			MediaRecodeOrTransfer mediaConvert = new MediaRecodeOrTransfer(camera, autoClose,noClientsDuration,netTimeout,readOrWriteTimeout);
 			cameras.put(mediaKey, mediaConvert);
 			ThreadUtil.execute(mediaConvert);
 		}
@@ -89,10 +115,8 @@ public class MediaService {
 
 		if (cameras.containsKey(mediaKey)) {
 			MediaRecodeOrTransfer mediaConvert = cameras.get(mediaKey);
-			mediaConvert.setRuning(false);
+			mediaConvert.setRunning(false);
 			cameras.remove(mediaKey);
 		}
 	}
-
-
 }

+ 109 - 39
src/main/java/com/zj/thread/MediaRecodeOrTransfer.java

@@ -31,8 +31,12 @@ import lombok.extern.slf4j.Slf4j;
  * <b> 什么情况下会转复用?</b>
  * <p> 视频源的音视频编码必须是浏览器和flv规范两者同时支持的编码,比如H264/AAC,</p>
  * <p> 否则将进行转码。</p>
- * <p> 不支持hevc、vvc、vp8、vp9、g711、g771a等编码</p>
- * @author eguid
+ * <p> 转封装暂不支持hevc、vvc、vp8、vp9、g711、g771a等编码</p>
+ *  <b> 转码累积延迟补偿暂未实现。</b>
+ *  * 由于转流过程中的拉流解码和编码是个线性串联链,多线程转码也不能解决该问题,后面可能需要采用主动跳包方式来解决
+ *  *
+ *  * @author ZJ
+ *  * @author eguid
  */
 @Slf4j
 public class MediaRecodeOrTransfer extends Thread {
@@ -53,7 +57,7 @@ public class MediaRecodeOrTransfer extends Thread {
 	/**
 	 * 运行状态
 	 */
-	private boolean runing = false;
+	private volatile boolean running = false;
 
 	private boolean grabberStatus = false;
 	
@@ -64,6 +68,11 @@ public class MediaRecodeOrTransfer extends Thread {
 	 */
 	private boolean autoClose = true;
 
+	/**
+	 * 无人观看时持续多久自动关闭流
+	 */
+	private long noClientsDuration = 60000;
+
 	private int hcSize, wcSize = 0;
 
 	/**
@@ -96,22 +105,35 @@ public class MediaRecodeOrTransfer extends Thread {
 	 */
 	private Thread listenThread;
 
+	/**
+	 * 网络超时,ffmpeg默认5秒,这里设置15秒
+	 */
+	private String netTimeout = "15000000";
+	/**
+	 * 读写超时,默认5秒
+	 */
+	private String readOrWriteTimeout = "15000000";
+
 	/**
 	 * @param camera
-	 * @param auto   流是否可以自动关闭
+	 * @param autoClose   流是否可以自动关闭
 	 */
-	public MediaRecodeOrTransfer(Camera camera, boolean autoClose) {
+	public MediaRecodeOrTransfer(Camera camera, boolean autoClose,long noClientsDuration,String netTimeout,String readOrWriteTimeout) {
 		super();
-		this.autoClose = autoClose;
 		this.camera = camera;
+		this.autoClose = autoClose;
+		this.netTimeout=netTimeout;
+		this.readOrWriteTimeout=readOrWriteTimeout;
+		this.autoClose=autoClose;
+		this.noClientsDuration=noClientsDuration;
 	}
 
-	public boolean isRuning() {
-		return runing;
+	public boolean isRunning() {
+		return running;
 	}
 
-	public void setRuning(boolean runing) {
-		this.runing = runing;
+	public void setRunning(boolean running) {
+		this.running = running;
 	}
 
 	/**
@@ -122,25 +144,48 @@ public class MediaRecodeOrTransfer extends Thread {
 		// 拉流器
 		grabber = new FFmpegFrameGrabber(camera.getUrl());
 		// 超时时间(15秒)
-		grabber.setOption("stimoout", "15000000");
+		grabber.setOption("stimeout", netTimeout);
 		grabber.setOption("threads", "1");
-		grabber.setPixelFormat(avutil.AV_PIX_FMT_YUV420P);
+		//grabber.setPixelFormat(avutil.AV_PIX_FMT_YUV420P);
 		// 设置缓存大小,提高画质、减少卡顿花屏
 		grabber.setOption("buffer_size", "1024000");
+
+		// 读写超时,适用于所有协议的通用读写超时
+		grabber.setOption("rw_timeout", readOrWriteTimeout);
+		// 探测视频流信息,为空默认5000000微秒
+		grabber.setOption("probesize", readOrWriteTimeout);
+		// 解析视频流信息,为空默认5000000微秒
+		grabber.setOption("analyzeduration", readOrWriteTimeout);
+
 		// 如果为rtsp流,增加配置
 		if ("rtsp".equals(camera.getUrl().substring(0, 4))) {
 			// 设置打开协议tcp / udp
 			grabber.setOption("rtsp_transport", "tcp");
 			//首选TCP进行RTP传输
 			grabber.setOption("rtsp_flags", "prefer_tcp");
-			//设置超时时间
-			grabber.setOption("stimeout","3000000");
+
+		} else if ("rtmp".equals(camera.getUrl().substring(0, 4))) {
+			// rtmp拉流缓冲区,默认3000毫秒
+			grabber.setOption("rtmp_buffer", "1000");
+			// 默认rtmp流为直播模式,不允许seek
+			//grabber.setOption("rtmp_live", "live");
+
+		} else if("desktop".equals(camera.getUrl())) {
+			//支持本地屏幕采集,可以用于监控屏幕、局域网和wifi投屏等
+			grabber.setFormat("gdigrab");
+			grabber.setOption("draw_mouse", "1");//绘制鼠标
+			grabber.setNumBuffers(0);
+			grabber.setOption("fflags", "nobuffer");
+			grabber.setOption("framerate", "25");
+			grabber.setFrameRate(25);
 		}
 
 		try {
 			grabber.start();
+			log.info("\r\n{}\r\n启动拉流器成功",camera.getUrl());
 			return grabberStatus = true;
 		} catch (Exception e) {
+			log.error("\r\n{}\r\n启动拉流器失败,网络超时或视频源不可用", camera.getUrl());
 			e.printStackTrace();
 		}
 		return grabberStatus = false;
@@ -168,6 +213,11 @@ public class MediaRecodeOrTransfer extends Thread {
 			recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P);
 //						recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC);
 			recorder.setAudioCodecName("aac");
+			/**
+			 * 启用RDOQ算法,优化视频质量 1:在视频码率和视频质量之间取得平衡 2:最大程度优化视频质量(会降低编码速度和提高码率)
+			 */
+			recorder.setTrellis(1);
+			recorder.setMaxDelay(0);//设置延迟
 			try {
 				recorder.start();
 				return recorderStatus=true;
@@ -176,13 +226,28 @@ public class MediaRecodeOrTransfer extends Thread {
 				e1.printStackTrace();
 			}
 		}else {
-			//转复用
+			// 转复用
+			//不让recorder关联关闭outputStream
+			recorder.setCloseOutputStream(false);
 			try {
 				recorder.start(grabber.getFormatContext());
 				return recorderStatus=true;
-			} catch (org.bytedeco.javacv.FrameRecorder.Exception e1) {
-				log.info("启动转复用录制器失败", e1);
-				e1.printStackTrace();
+			} catch (org.bytedeco.javacv.FrameRecorder.Exception e) {
+				log.warn("\r\n{}\r\n启动转复用录制器失败", camera.getUrl());
+				// 如果转复用失败,则自动切换到转码模式
+				transferFlag = false;
+				if (recorder != null) {
+					try {
+						recorder.stop();
+					} catch (org.bytedeco.javacv.FrameRecorder.Exception e1) {
+					}
+				}
+				if (createTransterOrRecodeRecorder()) {
+					log.error("\r\n{}\r\n切换到转码模式", camera.getUrl());
+					return true;
+				}
+				log.error("\r\n{}\r\n切换转码模式失败", camera.getUrl());
+				e.printStackTrace();
 			}
 		}
 		return recorderStatus=false;
@@ -195,7 +260,7 @@ public class MediaRecodeOrTransfer extends Thread {
 	private boolean supportFlvFormatCodec() {
 		int vcodec=grabber.getVideoCodec();
 		int acodec=grabber.getAudioCodec();
-		return (camera.getType() == 0) && (avcodec.AV_CODEC_ID_H264==vcodec||avcodec.AV_CODEC_ID_H263==vcodec)&&(avcodec.AV_CODEC_ID_AAC==acodec||avcodec.AV_CODEC_ID_AAC_LATM==acodec);
+		return (camera.getType() == 0) && ("desktop".equals(camera.getUrl())||avcodec.AV_CODEC_ID_H264==vcodec||avcodec.AV_CODEC_ID_H263==vcodec)&&(avcodec.AV_CODEC_ID_AAC==acodec||avcodec.AV_CODEC_ID_AAC_LATM==acodec);
 	}
 	
 	/**
@@ -240,7 +305,7 @@ public class MediaRecodeOrTransfer extends Thread {
 			bos.reset();
 		}
 
-		runing = true;
+		running = true;
 		
 		//启动监听线程(用于判断是否需要自动关闭推流)
 		listenClient();
@@ -249,31 +314,37 @@ public class MediaRecodeOrTransfer extends Thread {
 		long startTime = 0;
 		long videoTS = 0;
 		long lastTime=0;
-		//累积延迟计算
-		long latencyDifference=0;//延迟差值
-		long maxLatencyThreshold=3000;//最大延迟阈值,如果lastLatencyDifference-latencyDifference>maxLatencyThreshold,则重启拉流器
-		long lastLatencyDifference=0;//当前最新延迟差值,
-		
-		long processTime=0;//上一帧处理耗时,用于延迟时间补偿,处理耗时不算进累积延迟
-		for(;runing && grabberStatus && recorderStatus;) {
+		// 累积延迟计算
+		long latencyDifference = 0;// 累积延迟
+		long lastLatencyDifference = 0;// 当前最新一组gop的延迟
+		long maxLatencyThreshold = 30000000;// 最大延迟阈值,如果lastLatencyDifference-latencyDifference>maxLatencyThreshold,则重启拉流器
+
+		long processTime = 0;// 上一帧处理耗时,用于延迟时间补偿,处理耗时不算进累积延迟
+		for(;running && grabberStatus && recorderStatus;) {
 			
 			lastTime=System.currentTimeMillis();
 			//累积延迟过大,则重新建立连接
 			if (lastLatencyDifference-latencyDifference>maxLatencyThreshold) {
+				// 重置参数
+				recorderStatus = false;
 				try {
-					grabber.restart(); // grabber.grabFrame() avformat
+					if (!transferFlag) {
+						grabber.restart(); // grabber.grabFrame() avformat
+					}
+					//装封装模式,延迟很小不需要重启,只需要清空缓存即可
 					grabber.flush();
+					recorderStatus = true;
 					log.warn("\r\n{}\r\n重连成功》》》", camera.getUrl());
-				} catch (Exception e) {
+					continue;
+				} catch (IOException e) {
 					log.warn("\r\n{}\r\n重连失败!", camera.getUrl());
-					//跳出循环,销毁拉流器和录制器
+					// 跳出循环,销毁拉流器和录制器
 					break;
 				}
 			}
 
 			try {
 				if(transferFlag) {
-//					log.error("转复用流程");
 					//转复用
 					AVPacket pkt = grabber.grabPacket();
 					if (null!=pkt&&!pkt.isNull()) {
@@ -290,7 +361,6 @@ public class MediaRecodeOrTransfer extends Thread {
 						recorder.recordPacket(pkt);
 					}
 				}else {
-//					log.error("转码流程");
 					//转码
 					Frame frame = grabber.grabFrame();
 					if (frame != null) {
@@ -345,7 +415,7 @@ public class MediaRecodeOrTransfer extends Thread {
 		} catch (IOException e) {
 			e.printStackTrace();
 		} finally {
-			runing = false;
+			running = false;
 		}
 		log.info("关闭媒体流,{} ", camera.getUrl());
 	}
@@ -391,13 +461,13 @@ public class MediaRecodeOrTransfer extends Thread {
 	/**
 	 * 新增ws客戶端
 	 * 
-	 * @param session
+	 * @param ctx
 	 */
 	public void addWsClient(ChannelHandlerContext ctx) {
 		int timeout = 0;
 		while (true) {
 			try {
-				if (runing) {
+				if (running) {
 					try {
 						if (ctx.channel().isWritable()) {
 							// 发送帧前先发送header
@@ -454,7 +524,7 @@ public class MediaRecodeOrTransfer extends Thread {
 		if (httpClients.isEmpty() && wsClients.isEmpty()) {
 			// 等待20秒还没有客户端,则关闭推流
 			if (noClient > 20) {
-				runing = false;
+				running = false;
 				String mediaKey = MD5.create().digestHex(camera.getUrl());
 				MediaService.cameras.remove(mediaKey);
 			} else {
@@ -472,7 +542,7 @@ public class MediaRecodeOrTransfer extends Thread {
 	public void listenClient() {
 		listenThread = new Thread(new Runnable() {
 			public void run() {
-				while (runing) {
+				while (running) {
 					hasClient();
 					try {
 						Thread.sleep(1000);
@@ -487,13 +557,13 @@ public class MediaRecodeOrTransfer extends Thread {
 	/**
 	 * 新增http客戶端
 	 * 
-	 * @param session
+	 * @param ctx
 	 */
 	public void addHttpClient(ChannelHandlerContext ctx) {
 		int timeout = 0;
 		while (true) {
 			try {
-				if (runing) {
+				if (running) {
 					try {
 						if (ctx.channel().isWritable()) {
 							// 发送帧前先发送header

+ 8 - 1
src/main/resources/application.properties

@@ -2,6 +2,13 @@
 server.port = 8888
 #流媒体服务端口
 mediaserver.port = 8866
-
+#网络超时,15秒
+mediaserver.netTimeout=15000000
+#读写超时,15秒
+mediaserver.readOrWriteTimeout=15000000
+#无人观看时是否自动关闭流
+mediaserver.autoClose=true
+#无人拉流观看持续多久自动关闭,1分钟
+mediaserver.autoClose.noClientsDuration=60000
 #修改代码是否自动重启
 spring.devtools.restart.enabled=false