Преглед на файлове

优化当媒体与easymedia断开时easymedia与前端不断开问题,新增server名到websocket头部信息

MisterZhang преди 4 години
родител
ревизия
1ef22ab132

+ 3 - 0
src/main/java/com/zj/common/MediaConstant.java

@@ -10,6 +10,9 @@ import java.util.concurrent.TimeUnit;
  *
  */
 public class MediaConstant {
+	
+	//header server名称
+	public static String serverName = "EasyMedia";
 
 	//自定义链式线程池
 	public static ThreadPoolExecutor threadpool = new ThreadPoolExecutor(20, 500, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), new ThreadPoolExecutor.CallerRunsPolicy());

+ 9 - 2
src/main/java/com/zj/server/FlvHandler.java

@@ -4,6 +4,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
+import com.zj.common.MediaConstant;
 import com.zj.dto.Camera;
 import com.zj.service.MediaService;
 
@@ -14,6 +15,8 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.DefaultHttpResponse;
@@ -109,7 +112,11 @@ public class FlvHandler extends SimpleChannelInboundHandler<Object> {
 				if (handshaker == null) {
 					WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
 				} else {
-					handshaker.handshake(ctx.channel(), req);
+					HttpResponse rsp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+					rsp.headers().set(HttpHeaderNames.SERVER, MediaConstant.serverName);
+					DefaultChannelPromise channelPromise = new DefaultChannelPromise(ctx.channel());
+					
+					handshaker.handshake(ctx.channel(), req, rsp.headers(), channelPromise);
 					mediaService.playForWs(camera, ctx);
 				}
 			}
@@ -148,7 +155,7 @@ public class FlvHandler extends SimpleChannelInboundHandler<Object> {
 		rsp.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
 				.set(HttpHeaderNames.CONTENT_TYPE, "video/x-flv").set(HttpHeaderNames.ACCEPT_RANGES, "bytes")
 				.set(HttpHeaderNames.PRAGMA, "no-cache").set(HttpHeaderNames.CACHE_CONTROL, "no-cache")
-				.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED).set(HttpHeaderNames.SERVER, "zhang");
+				.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED).set(HttpHeaderNames.SERVER, MediaConstant.serverName);
 		ctx.writeAndFlush(rsp);
 	}
 

+ 0 - 1
src/main/java/com/zj/service/MediaService.java

@@ -5,7 +5,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import com.zj.common.CacheMap;
 import com.zj.common.ClientType;
 import com.zj.dto.Camera;
 import com.zj.thread.MediaTransfer;

+ 49 - 37
src/main/java/com/zj/thread/MediaTransferFlvByFFmpeg.java

@@ -34,8 +34,8 @@ import lombok.extern.slf4j.Slf4j;
  * 
  * 使用ffmpeg推拉流,可以说无敌了
  * 
- * 优点:支持各种杂七杂八的流,兼容性比较好,稳定,不容易出错,自身带有重连机制,可以自己使用命令封装
- * 缺点:系统会存在多个ffmpeg进程, 无法直接操作帧,延迟优化没javacv方便
+ * 优点:支持各种杂七杂八的流,兼容性比较好,稳定,不容易出错,自身带有重连机制,可以自己使用命令封装 缺点:系统会存在多个ffmpeg进程,
+ * 无法直接操作帧,延迟优化没javacv方便
  * 
  * @author ZJ
  *
@@ -72,10 +72,10 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 	private Thread listenThread;
 	private boolean running = false; // 启动
 	private boolean enableLog = true;
-	
+
 	private int hcSize, wcSize = 0;
-	
-	//记录当前
+
+	// 记录当前
 	long currentTimeMillis = System.currentTimeMillis();
 
 	/**
@@ -93,7 +93,7 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 		this.camera = camera;
 		buildCommand();
 	}
-	
+
 	public MediaTransferFlvByFFmpeg(final String executable, Camera camera) {
 		command.add(executable);
 		this.camera = camera;
@@ -106,7 +106,7 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 		this.enableLog = enableLog;
 		buildCommand();
 	}
-	
+
 	public boolean isEnableLog() {
 		return enableLog;
 	}
@@ -132,28 +132,24 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 	 * 构建ffmpeg命令
 	 */
 	private void buildCommand() {
-		this
-		.addArgument("-rtsp_transport").addArgument("tcp")
-		.addArgument("-i").addArgument(camera.getUrl())
-		.addArgument("-max_delay").addArgument("1")
+		this.addArgument("-rtsp_transport").addArgument("tcp").addArgument("-i").addArgument(camera.getUrl())
+				.addArgument("-max_delay").addArgument("1")
 //		.addArgument("-strict").addArgument("experimental")
-		.addArgument("-g").addArgument("25")
-		.addArgument("-r").addArgument("25")
+				.addArgument("-g").addArgument("25").addArgument("-r").addArgument("25")
 //		.addArgument("-b").addArgument("200000")
 //		.addArgument("-filter_complex").addArgument("setpts='(RTCTIME - RTCSTART) / (TB * 1000000)'")
-		.addArgument("-c:v").addArgument("libx264")
-		.addArgument("-preset:v").addArgument("ultrafast")
+				.addArgument("-c:v").addArgument("libx264").addArgument("-preset:v").addArgument("ultrafast")
 //		.addArgument("-preset:v").addArgument("fast")
-		.addArgument("-tune:v").addArgument("zerolatency")
+				.addArgument("-tune:v").addArgument("zerolatency")
 //		.addArgument("-crf").addArgument("26")
-		.addArgument("-c:a").addArgument("aac")
+				.addArgument("-c:a").addArgument("aac")
 //		.addArgument("-qmin").addArgument("28")
 //		.addArgument("-qmax").addArgument("32")
 //		.addArgument("-b:v").addArgument("448k")
 //		.addArgument("-b:a").addArgument("64k")
-		.addArgument("-f").addArgument("flv");
+				.addArgument("-f").addArgument("flv");
 	}
-	
+
 //	private void buildCommand() {
 //		this
 ////		.addArgument("-rtsp_transport").addArgument("tcp")
@@ -178,6 +174,7 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 
 	/**
 	 * 执行推流
+	 * 
 	 * @return
 	 */
 	public MediaTransferFlvByFFmpeg execute() {
@@ -214,7 +211,7 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 					int len = 0;
 					ByteArrayOutputStream bos = new ByteArrayOutputStream();
 					while (running) {
-						
+
 						len = input.read(buffer);
 						if (len == -1) {
 							break;
@@ -228,15 +225,15 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 							bos.reset();
 							continue;
 						}
-						
+
 						// 帧数据
 						byte[] data = bos.toByteArray();
 						bos.reset();
-						
+
 						// 发送到前端
 						sendFrameData(data);
 					}
-					
+
 					try {
 						client.close();
 					} catch (java.lang.Exception e) {
@@ -249,7 +246,7 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 						bos.close();
 					} catch (java.lang.Exception e) {
 					}
-					
+
 					log.info("关闭媒体流-ffmpeg,{} ", camera.getUrl());
 
 				} catch (SocketTimeoutException e1) {
@@ -279,7 +276,7 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 
 		outputThread.start();
 	}
-	
+
 	/**
 	 * 监听客户端
 	 */
@@ -297,7 +294,7 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 		});
 		listenThread.start();
 	}
-	
+
 	/**
 	 * 监听网络异常超时
 	 */
@@ -305,14 +302,14 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 		Thread listenNetTimeoutThread = new Thread(new Runnable() {
 			public void run() {
 				while (true) {
-					
-					if((System.currentTimeMillis()-currentTimeMillis) > 15000) {
+
+					if ((System.currentTimeMillis() - currentTimeMillis) > 15000) {
 						log.info("网络异常超时");
 						MediaService.cameras.remove(camera.getMediaKey());
 						stopFFmpeg();
 						break;
 					}
-					
+
 					try {
 						Thread.sleep(5000);
 					} catch (InterruptedException e) {
@@ -432,7 +429,7 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 		new RuntimeException("无法启用端口");
 		return "";
 	}
-	
+
 	/**
 	 * 关闭
 	 */
@@ -444,8 +441,22 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 		} catch (java.lang.Exception e) {
 			process.destroyForcibly();
 		}
+
+		// 媒体异常时,主动断开前端长连接
+		for (Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
+			try {
+				entry.getValue().close();
+			} catch (java.lang.Exception e) {
+			}
+		}
+		for (Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
+			try {
+				entry.getValue().close();
+			} catch (java.lang.Exception e) {
+			}
+		}
 	}
-	
+
 	/**
 	 * 关闭流
 	 * 
@@ -465,7 +476,7 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 		if (!camera.isAutoClose()) {
 			return;
 		}
-		
+
 		if (httpClients.isEmpty() && wsClients.isEmpty()) {
 			// 等待20秒还没有客户端,则关闭推流
 			if (noClient > camera.getNoClientsDuration()) {
@@ -476,7 +487,7 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 //				log.info("\r\n{}\r\n {} 秒自动关闭推拉流 \r\n", camera.getUrl(), noClientsDuration-noClient);
 			}
 		} else {
-			//重置计时
+			// 重置计时
 			noClient = 0;
 		}
 	}
@@ -518,7 +529,7 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 			}
 		}
 	}
-	
+
 	/**
 	 * 新增客户端
 	 * 
@@ -581,9 +592,10 @@ public class MediaTransferFlvByFFmpeg extends MediaTransfer {
 //		System.out.println(serverSocket.getLocalPort());
 //		System.out.println(serverSocket.getInetAddress().getHostAddress());
 
-		MediaTransferFlvByFFmpeg.atPath().addArgument("-i").addArgument("rtsp://admin:VZCDOY@192.168.2.84:554/Streaming/Channels/102")
-				.addArgument("-g").addArgument("5").addArgument("-c:v").addArgument("libx264").addArgument("-c:a")
-				.addArgument("aac").addArgument("-f").addArgument("flv").execute();
+		MediaTransferFlvByFFmpeg.atPath().addArgument("-i")
+				.addArgument("rtsp://admin:VZCDOY@192.168.2.84:554/Streaming/Channels/102").addArgument("-g")
+				.addArgument("5").addArgument("-c:v").addArgument("libx264").addArgument("-c:a").addArgument("aac")
+				.addArgument("-f").addArgument("flv").execute();
 	}
 
 }

+ 14 - 0
src/main/java/com/zj/thread/MediaTransferFlvByJavacv.java

@@ -507,6 +507,20 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable
 	private void closeMedia() {
 		running = false;
 		MediaService.cameras.remove(camera.getMediaKey());
+		
+		//媒体异常时,主动断开前端长连接
+		for (Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
+			try {
+				entry.getValue().close();
+			} catch (java.lang.Exception e) {
+			}
+		}
+		for (Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
+			try {
+				entry.getValue().close();
+			} catch (java.lang.Exception e) {
+			}
+		}
 	}
 
 	/**