Prechádzať zdrojové kódy

支持转复用或转码线程、其他优化

MisterZhang 4 rokov pred
rodič
commit
1a6de08a70

+ 74 - 49
src/main/java/com/zj/thread/MediaRecodeOrTransfer.java

@@ -90,6 +90,11 @@ public class MediaRecodeOrTransfer extends Thread {
 	 * 相机
 	 */
 	private Camera camera;
+	
+	/**
+	 * 监听线程,用于监听状态
+	 */
+	private Thread listenThread;
 
 	/**
 	 * @param camera
@@ -218,6 +223,10 @@ public class MediaRecodeOrTransfer extends Thread {
 		}
 
 		runing = true;
+		
+		//启动监听线程(用于判断是否需要自动关闭推流)
+		listenClient();
+		
 		//时间戳计算
 		long startTime = 0;
 		long videoTS = 0;
@@ -244,11 +253,9 @@ public class MediaRecodeOrTransfer extends Thread {
 				}
 			}
 
-			hasClient();
-
 			try {
 				if(transferFlag) {
-					log.error("转复用流程");
+//					log.error("转复用流程");
 					//转复用
 					AVPacket pkt = grabber.grabPacket();
 					if (null!=pkt&&!pkt.isNull()) {
@@ -265,7 +272,7 @@ public class MediaRecodeOrTransfer extends Thread {
 						recorder.recordPacket(pkt);
 					}
 				}else {
-					log.error("转码流程");
+//					log.error("转码流程");
 					//转码
 					Frame frame = grabber.grabFrame();
 					if (frame != null) {
@@ -298,37 +305,9 @@ public class MediaRecodeOrTransfer extends Thread {
 				byte[] b = bos.toByteArray();
 				bos.reset();
 
-				// ws输出帧流
-				for (Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
-					try {
-						if (entry.getValue().channel().isWritable()) {
-							entry.getValue().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(b)));
-						} else {
-							wsClients.remove(entry.getKey());
-							hasClient();
-						}
-					} catch (java.lang.Exception e) {
-						wsClients.remove(entry.getKey());
-						hasClient();
-						e.printStackTrace();
-					}
-				}
-
-				// http
-				for (Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
-					try {
-						if (entry.getValue().channel().isWritable()) {
-							entry.getValue().writeAndFlush(Unpooled.copiedBuffer(b));
-						} else {
-							httpClients.remove(entry.getKey());
-							hasClient();
-						}
-					} catch (java.lang.Exception e) {
-						httpClients.remove(entry.getKey());
-						hasClient();
-						e.printStackTrace();
-					}
-				}
+				// 发送视频到前端
+				sendFrameData(b);
+				
 				//流程耗时记录
 				if(lastTime>0) {
 					processTime=System.currentTimeMillis()-lastTime;
@@ -352,6 +331,44 @@ public class MediaRecodeOrTransfer extends Thread {
 		}
 		log.info("关闭媒体流,{} ", camera.getUrl());
 	}
+	
+	/**
+	 * 发送帧数据
+	 * 
+	 * @param data
+	 */
+	private void sendFrameData(byte[] data) {
+		// ws
+		for (Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
+			try {
+				if (entry.getValue().channel().isWritable()) {
+					entry.getValue().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(data)));
+				} else {
+					wsClients.remove(entry.getKey());
+					hasClient();
+				}
+			} catch (java.lang.Exception e) {
+				wsClients.remove(entry.getKey());
+				hasClient();
+				e.printStackTrace();
+			}
+		}
+		// http
+		for (Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
+			try {
+				if (entry.getValue().channel().isWritable()) {
+					entry.getValue().writeAndFlush(Unpooled.copiedBuffer(data));
+				} else {
+					httpClients.remove(entry.getKey());
+					hasClient();
+				}
+			} catch (java.lang.Exception e) {
+				httpClients.remove(entry.getKey());
+				hasClient();
+				e.printStackTrace();
+			}
+		}
+	}
 
 	/**
 	 * 新增ws客戶端
@@ -398,7 +415,7 @@ public class MediaRecodeOrTransfer extends Thread {
 	}
 
 	/**
-	 * 关闭流
+	 * 判断有没有客户端,关闭流
 	 * 
 	 * @return
 	 */
@@ -414,33 +431,41 @@ public class MediaRecodeOrTransfer extends Thread {
 
 		// 自动拉流无需关闭
 		if (!autoClose) {
-			if (httpClients.isEmpty() && wsClients.isEmpty()) {
-				try {
-					Thread.sleep(5); // 不能太久
-				} catch (InterruptedException e) {
-				}
-			}
 			return;
 		}
 		if (httpClients.isEmpty() && wsClients.isEmpty()) {
-			// 5*2000=10000=10,等待10秒还没有客户端,则关闭推流
-			if (noClient > 2000) {
+			// 等待20秒还没有客户端,则关闭推流
+			if (noClient > 20) {
 				runing = false;
 				String mediaKey = MD5.create().digestHex(camera.getUrl());
 				MediaService.cameras.remove(mediaKey);
-
 			} else {
-				try {
-					Thread.sleep(5);
-				} catch (InterruptedException e) {
-				}
 				noClient += 1;
+//				log.info("\r\n{}\r\n {} 秒自动关闭推拉流 \r\n", camera.getUrl(), 11-noClient);
 			}
 		} else {
 			noClient = 0;
 		}
 	}
 
+	/**
+	 * 监听客户端,用于判断无人观看时自动关闭推流
+	 */
+	public void listenClient() {
+		listenThread = new Thread(new Runnable() {
+			public void run() {
+				while (runing) {
+					hasClient();
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e) {
+					}
+				}
+			}
+		});
+		listenThread.start();
+	}
+
 	/**
 	 * 新增http客戶端
 	 *