package com.zj.server; import java.util.HashMap; import java.util.List; import java.util.Map; import com.zj.entity.Camera; import com.zj.service.CameraRepository; import com.zj.service.MediaService; import cn.hutool.core.util.StrUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; // http://localhost:8866/live?url=rtsp://admin:VZCDOY@192.168.2.84:554/Streaming/Channels/102 // ws://localhost:8866/live?url=rtsp://admin:VZCDOY@192.168.2.84:554/Streaming/Channels/102 @Slf4j public class FlvHandler extends SimpleChannelInboundHandler { private MediaService mediaService; private WebSocketServerHandshaker handshaker; private CameraRepository cameraRepository; public FlvHandler(MediaService mediaService, CameraRepository cameraRepository) { super(); this.mediaService = mediaService; this.cameraRepository = cameraRepository; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { FullHttpRequest req = (FullHttpRequest) msg; // Map parmMap = new RequestParser(msg).parse(); QueryStringDecoder decoder = new QueryStringDecoder(req.uri()); // 判断请求uri if (!"/live".equals(decoder.path())) { log.info("uri有误"); sendError(ctx, HttpResponseStatus.BAD_REQUEST); return; } Map parse = parseUrl(req.uri()); String streamUrl = parse.get("url"); String autoCloseStr = parse.get("autoClose"); if (StrUtil.isBlank(streamUrl)) { log.info("url有误"); sendError(ctx, HttpResponseStatus.BAD_REQUEST); return; } Camera camera = new Camera(); camera.setUrl(streamUrl); //是否需要自动关闭流 boolean autoClose = true; if (StrUtil.isNotBlank(autoCloseStr)) { if ("false".equals(autoCloseStr)) { autoClose = false; cameraRepository.add(camera); } } if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { // http请求 sendFlvReqHeader(ctx); mediaService.playForHttp(camera, ctx, autoClose); } else { // websocket握手,请求升级 // 参数分别是ws地址,子协议,是否扩展,最大frame长度 WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory( getWebSocketLocation(req), null, true, 5 * 1024 * 1024); handshaker = factory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); mediaService.playForWs(camera, ctx, autoClose); } } } else if (msg instanceof WebSocketFrame) { handleWebSocketRequest(ctx, (WebSocketFrame) msg); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 添加连接 } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 断开连接 } /** * ws握手地址 */ private String getWebSocketLocation(FullHttpRequest request) { String location = request.headers().get(HttpHeaderNames.HOST) + request.uri(); return "ws://" + location; } /** * 发送req header,告知浏览器是flv格式 * * @param ctx */ private void sendFlvReqHeader(ChannelHandlerContext ctx) { HttpResponse rsp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); rsp.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) .set(HttpHeaderNames.CONTENT_TYPE, "video/x-flv").set(HttpHeaderNames.ACCEPT_RANGES, "bytes") .set(HttpHeaderNames.PRAGMA, "no-cache").set(HttpHeaderNames.CACHE_CONTROL, "no-cache") .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED).set(HttpHeaderNames.SERVER, "zhang"); ctx.writeAndFlush(rsp); } /** * 错误请求响应 * * @param ctx * @param status */ private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, Unpooled.copiedBuffer("请求地址有误: " + status + "\r\n", CharsetUtil.UTF_8)); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8"); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } /** * websocket处理 * * @param ctx * @param frame */ private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame frame) { // 关闭 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 握手PING/PONG if (frame instanceof PingWebSocketFrame) { ctx.write(new PongWebSocketFrame(frame.content().retain())); return; } // 文本 if (frame instanceof TextWebSocketFrame) { return; } if (frame instanceof BinaryWebSocketFrame) { return; } } private Map parseUrl(String url) { String[] split = url.split("url="); String urlParent = split[1]; Map pMap = new HashMap(); //有autoClose if(urlParent.indexOf("autoClose") != -1) { String[] urlChild = urlParent.split("\\?"); if(urlChild.length > 1) { String rUrl = urlChild[0]; String fullParam = urlChild[1]; String[] params = fullParam.split("&"); StringBuffer sb = new StringBuffer(); sb.append(rUrl); sb.append("?"); for (String param : params) { if(param.indexOf("autoClose") == -1) { sb.append(param); sb.append("&"); } else { String[] as = param.split("="); pMap.put(as[0], as[1]); } } int lastIndexOf = sb.toString().lastIndexOf("&"); String xxx = sb.toString().substring(0, lastIndexOf); pMap.put("url", xxx); } } else { pMap.put("url", urlParent); } return pMap; } }