MediaTransferFlvByFFmpeg.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. package com.zj.thread;
  2. import java.io.BufferedReader;
  3. import java.io.DataInputStream;
  4. import java.io.IOException;
  5. import java.io.InputStreamReader;
  6. import java.net.InetAddress;
  7. import java.net.ServerSocket;
  8. import java.net.Socket;
  9. import java.net.SocketTimeoutException;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. import java.util.Map.Entry;
  13. import org.bytedeco.javacv.FrameGrabber.Exception;
  14. import com.zj.common.ClientType;
  15. import com.zj.common.MediaConstant;
  16. import com.zj.dto.CameraDto;
  17. import com.zj.service.MediaService;
  18. import cn.hutool.core.collection.CollUtil;
  19. import cn.hutool.core.thread.ThreadUtil;
  20. import io.netty.buffer.Unpooled;
  21. import io.netty.channel.ChannelFuture;
  22. import io.netty.channel.ChannelHandlerContext;
  23. import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
  24. import io.netty.util.concurrent.Future;
  25. import io.netty.util.concurrent.GenericFutureListener;
  26. import lombok.extern.slf4j.Slf4j;
  27. /**
  28. *
  29. * 使用ffmpeg推拉流,可以说无敌了
  30. *
  31. * 优点:支持各种杂七杂八的流,兼容性比较好,稳定,不容易出错,自身带有重连机制,可以自己使用命令封装 缺点:系统会存在多个ffmpeg进程,
  32. * 无法直接操作帧,延迟优化没javacv方便
  33. *
  34. * @author ZJ
  35. *
  36. */
  37. @Slf4j
  38. public class MediaTransferFlvByFFmpeg extends MediaTransfer {
  39. /**
  40. * 相机
  41. */
  42. private CameraDto cameraDto;
  43. private List<String> command = new ArrayList<>();
  44. private ServerSocket tcpServer = null;
  45. private Process process;
  46. private Thread inputThread;
  47. private Thread errThread;
  48. private Thread outputThread;
  49. private boolean running = false; // 启动
  50. private boolean enableLog = true;
  51. // 记录当前
  52. long currentTimeMillis = System.currentTimeMillis();
  53. public MediaTransferFlvByFFmpeg(final String executable) {
  54. command.add(executable);
  55. buildCommand();
  56. }
  57. public MediaTransferFlvByFFmpeg(CameraDto cameraDto) {
  58. command.add(System.getProperty(MediaConstant.ffmpegPathKey));
  59. this.cameraDto = cameraDto;
  60. buildCommand();
  61. }
  62. public MediaTransferFlvByFFmpeg(final String executable, CameraDto cameraDto) {
  63. command.add(executable);
  64. this.cameraDto = cameraDto;
  65. buildCommand();
  66. }
  67. public MediaTransferFlvByFFmpeg(final String executable, CameraDto cameraDto, boolean enableLog) {
  68. command.add(executable);
  69. this.cameraDto = cameraDto;
  70. this.enableLog = enableLog;
  71. buildCommand();
  72. }
  73. public boolean isEnableLog() {
  74. return enableLog;
  75. }
  76. public void setEnableLog(boolean enableLog) {
  77. this.enableLog = enableLog;
  78. }
  79. public boolean isRunning() {
  80. return running;
  81. }
  82. public void setRunning(boolean running) {
  83. this.running = running;
  84. }
  85. private MediaTransferFlvByFFmpeg addArgument(String argument) {
  86. command.add(argument);
  87. return this;
  88. }
  89. /**
  90. * 构建ffmpeg转码命令,新版javacv移除libx264,使用libopenh264 查看显卡硬件加速支持的选项ffmpeg -hwaccels
  91. * 查看ffmpeg支持选项 linux:ffmpeg -codecs | grep cuvid, window:ffmpeg -codecs |
  92. * findstr cuvid h264_nvenc ffmpeg -hwaccel cuvid -c:v h264_cuvid
  93. * -rtsp_transport tcp -i "rtsp地址" -c:v h264_nvenc -b:v 500k -vf
  94. * scale_npp=1280:-1 -y /home/2.mp4
  95. *
  96. * -hwaccel cuvid:指定使用cuvid硬件加速 -c:v h264_cuvid:使用h264_cuvid进行视频解码 -c:v
  97. * h264_nvenc:使用h264_nvenc进行视频编码 -vf
  98. * scale_npp=1280:-1:指定输出视频的宽高,注意,这里和软解码时使用的-vf scale=x:x不一样
  99. *
  100. * 转码期间nvidia-smi查看显卡状态 -hwaccel_device N 指定某颗GPU执行转码任务
  101. */
  102. private void buildCommand() {
  103. // 如果为rtsp流,增加配置
  104. if ("rtsp".equals(cameraDto.getUrl().substring(0, 4))) {
  105. this.addArgument("-rtsp_transport").addArgument("tcp");
  106. }
  107. //如果是本地文件
  108. if(cameraDto.getType() == 1) {
  109. this.addArgument("-re");
  110. }
  111. this.addArgument("-i").addArgument(cameraDto.getUrl())
  112. .addArgument("-max_delay").addArgument("1")
  113. // .addArgument("-strict").addArgument("experimental")
  114. .addArgument("-g").addArgument("25").addArgument("-r").addArgument("25")
  115. // .addArgument("-b").addArgument("200000")
  116. // .addArgument("-filter_complex").addArgument("setpts='(RTCTIME - RTCSTART) / (TB * 1000000)'")
  117. .addArgument("-c:v").addArgument("libopenh264").addArgument("-preset:v").addArgument("ultrafast")
  118. // .addArgument("-preset:v").addArgument("fast")
  119. .addArgument("-tune:v").addArgument("zerolatency")
  120. // .addArgument("-crf").addArgument("26")
  121. .addArgument("-c:a").addArgument("aac")
  122. // .addArgument("-qmin").addArgument("28")
  123. // .addArgument("-qmax").addArgument("32")
  124. // .addArgument("-b:v").addArgument("448k")
  125. // .addArgument("-b:a").addArgument("64k")
  126. .addArgument("-f").addArgument("flv");
  127. }
  128. /**
  129. * 转封装命令
  130. */
  131. private void buildCopyCommand() {
  132. this.addArgument("-rtsp_transport").addArgument("tcp").addArgument("-i").addArgument(cameraDto.getUrl())
  133. .addArgument("-max_delay").addArgument("1").addArgument("-g").addArgument("25").addArgument("-r")
  134. .addArgument("25").addArgument("-c:v").addArgument("copy").addArgument("-c:a").addArgument("copy")
  135. .addArgument("-f").addArgument("flv");
  136. }
  137. // private void buildCommand() {
  138. // this
  139. //// .addArgument("-rtsp_transport").addArgument("tcp")
  140. // .addArgument("-i").addArgument(camera.getUrl())
  141. // .addArgument("-max_delay").addArgument("100")
  142. //// .addArgument("-strict").addArgument("experimental")
  143. // .addArgument("-g").addArgument("10")
  144. //// .addArgument("-r").addArgument("25")
  145. //// .addArgument("-b").addArgument("200000")
  146. //// .addArgument("-filter_complex").addArgument("setpts='(RTCTIME - RTCSTART) / (TB * 1000000)'")
  147. // .addArgument("-c:v").addArgument("libx264")
  148. // .addArgument("-preset:v").addArgument("ultrafast")
  149. // .addArgument("-tune:v").addArgument("zerolatency")
  150. //// .addArgument("-crf").addArgument("26")
  151. // .addArgument("-c:a").addArgument("aac")
  152. // .addArgument("-qmin").addArgument("28")
  153. // .addArgument("-qmax").addArgument("32")
  154. // .addArgument("-b:v").addArgument("448k")
  155. // .addArgument("-b:a").addArgument("64k")
  156. // .addArgument("-f").addArgument("flv");
  157. // }
  158. /**
  159. * 执行推流
  160. *
  161. * @return
  162. */
  163. public MediaTransferFlvByFFmpeg execute() {
  164. String output = getOutput();
  165. command.add(output);
  166. String join = CollUtil.join(command, " ");
  167. log.info(join);
  168. try {
  169. process = new ProcessBuilder(command).start();
  170. running = true;
  171. listenNetTimeout();
  172. dealStream(process);
  173. outputData();
  174. listenClient();
  175. } catch (IOException e) {
  176. e.printStackTrace();
  177. }
  178. return this;
  179. }
  180. /**
  181. * flv数据
  182. */
  183. private void outputData() {
  184. outputThread = new Thread(new Runnable() {
  185. public void run() {
  186. Socket client = null;
  187. try {
  188. client = tcpServer.accept();
  189. DataInputStream input = new DataInputStream(client.getInputStream());
  190. byte[] buffer = new byte[1024];
  191. int len = 0;
  192. // ByteArrayOutputStream bos = new ByteArrayOutputStream();
  193. while (running) {
  194. len = input.read(buffer);
  195. if (len == -1) {
  196. break;
  197. }
  198. bos.write(buffer, 0, len);
  199. if (header == null) {
  200. header = bos.toByteArray();
  201. // System.out.println(HexUtil.encodeHexStr(header));
  202. bos.reset();
  203. continue;
  204. }
  205. // 帧数据
  206. byte[] data = bos.toByteArray();
  207. bos.reset();
  208. // 发送到前端
  209. sendFrameData(data);
  210. }
  211. try {
  212. client.close();
  213. } catch (java.lang.Exception e) {
  214. }
  215. try {
  216. input.close();
  217. } catch (java.lang.Exception e) {
  218. }
  219. try {
  220. bos.close();
  221. } catch (java.lang.Exception e) {
  222. }
  223. log.info("关闭媒体流-ffmpeg,{} ", cameraDto.getUrl());
  224. } catch (SocketTimeoutException e1) {
  225. // e1.printStackTrace();
  226. // 超时关闭
  227. } catch (IOException e) {
  228. // e.printStackTrace();
  229. } finally {
  230. MediaService.cameras.remove(cameraDto.getMediaKey());
  231. running = false;
  232. process.destroy();
  233. try {
  234. if (null != client) {
  235. client.close();
  236. }
  237. } catch (IOException e) {
  238. }
  239. try {
  240. if (null != tcpServer) {
  241. tcpServer.close();
  242. }
  243. } catch (IOException e) {
  244. }
  245. }
  246. }
  247. });
  248. outputThread.start();
  249. }
  250. /**
  251. * 监听客户端
  252. */
  253. public void listenClient() {
  254. MediaListenThread.putThread(this);
  255. }
  256. /**
  257. * 监听网络异常超时
  258. */
  259. public void listenNetTimeout() {
  260. Thread listenNetTimeoutThread = new Thread(new Runnable() {
  261. public void run() {
  262. while (true) {
  263. if ((System.currentTimeMillis() - currentTimeMillis) > 15000) {
  264. log.info("网络异常超时");
  265. MediaService.cameras.remove(cameraDto.getMediaKey());
  266. stopFFmpeg();
  267. break;
  268. }
  269. try {
  270. Thread.sleep(5000);
  271. } catch (InterruptedException e) {
  272. }
  273. }
  274. }
  275. });
  276. listenNetTimeoutThread.setDaemon(true);
  277. listenNetTimeoutThread.start();
  278. }
  279. public static MediaTransferFlvByFFmpeg atPath() {
  280. return atPath(null);
  281. }
  282. public static MediaTransferFlvByFFmpeg atPath(final String absPath) {
  283. final String executable;
  284. if (absPath != null) {
  285. executable = absPath;
  286. } else {
  287. // executable = "ffmpeg";
  288. executable = System.getProperty(MediaConstant.ffmpegPathKey);
  289. }
  290. return new MediaTransferFlvByFFmpeg(executable);
  291. }
  292. /**
  293. * 控制台输出
  294. *
  295. * @param process
  296. */
  297. private void dealStream(Process process) {
  298. if (process == null) {
  299. return;
  300. }
  301. // 处理InputStream的线程
  302. inputThread = new Thread() {
  303. @Override
  304. public void run() {
  305. BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()));
  306. String line = null;
  307. try {
  308. while (running) {
  309. line = in.readLine();
  310. currentTimeMillis = System.currentTimeMillis();
  311. if (line == null) {
  312. break;
  313. }
  314. if (enableLog) {
  315. log.info("output: " + line);
  316. }
  317. }
  318. } catch (IOException e) {
  319. e.printStackTrace();
  320. } finally {
  321. try {
  322. running = false;
  323. in.close();
  324. } catch (IOException e) {
  325. e.printStackTrace();
  326. }
  327. }
  328. }
  329. };
  330. // 处理ErrorStream的线程
  331. errThread = new Thread() {
  332. @Override
  333. public void run() {
  334. BufferedReader err = new BufferedReader(new InputStreamReader(process.getErrorStream()));
  335. String line = null;
  336. try {
  337. while (running) {
  338. line = err.readLine();
  339. currentTimeMillis = System.currentTimeMillis();
  340. if (line == null) {
  341. break;
  342. }
  343. if (enableLog) {
  344. log.info("ffmpeg: " + line);
  345. }
  346. }
  347. } catch (IOException e) {
  348. e.printStackTrace();
  349. } finally {
  350. try {
  351. running = false;
  352. err.close();
  353. } catch (IOException e) {
  354. e.printStackTrace();
  355. }
  356. }
  357. }
  358. };
  359. inputThread.start();
  360. errThread.start();
  361. }
  362. /**
  363. * 输出到tcp
  364. *
  365. * @return
  366. */
  367. private String getOutput() {
  368. try {
  369. tcpServer = new ServerSocket(0, 1, InetAddress.getLoopbackAddress());
  370. StringBuffer sb = new StringBuffer();
  371. sb.append("tcp://");
  372. sb.append(tcpServer.getInetAddress().getHostAddress());
  373. sb.append(":");
  374. sb.append(tcpServer.getLocalPort());
  375. tcpServer.setSoTimeout(10000);
  376. return sb.toString();
  377. } catch (IOException e) {
  378. e.printStackTrace();
  379. }
  380. new RuntimeException("无法启用端口");
  381. return "";
  382. }
  383. /**
  384. * 关闭
  385. */
  386. public void stopFFmpeg() {
  387. this.running = false;
  388. try {
  389. this.process.destroy();
  390. log.info("关闭媒体流-ffmpeg,{} ", cameraDto.getUrl());
  391. } catch (java.lang.Exception e) {
  392. process.destroyForcibly();
  393. }
  394. // 媒体异常时,主动断开前端长连接
  395. for (Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
  396. try {
  397. entry.getValue().close();
  398. } catch (java.lang.Exception e) {
  399. } finally {
  400. wsClients.remove(entry.getKey());
  401. }
  402. }
  403. for (Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
  404. try {
  405. entry.getValue().close();
  406. } catch (java.lang.Exception e) {
  407. } finally {
  408. httpClients.remove(entry.getKey());
  409. }
  410. }
  411. }
  412. /**
  413. * 关闭流
  414. *
  415. * @return
  416. */
  417. @Override
  418. public void hasClient() {
  419. int newHcSize = httpClients.size();
  420. int newWcSize = wsClients.size();
  421. if (hcSize != newHcSize || wcSize != newWcSize) {
  422. hcSize = newHcSize;
  423. wcSize = newWcSize;
  424. log.info("\r\n{}\r\nhttp连接数:{}, ws连接数:{} \r\n", cameraDto.getUrl(), newHcSize, newWcSize);
  425. }
  426. // 无需自动关闭
  427. if (!cameraDto.isAutoClose()) {
  428. return;
  429. }
  430. if (httpClients.isEmpty() && wsClients.isEmpty()) {
  431. // 等待20秒还没有客户端,则关闭推流
  432. if (noClient > cameraDto.getNoClientsDuration()) {
  433. running = false;
  434. MediaService.cameras.remove(cameraDto.getMediaKey());
  435. } else {
  436. noClient += 1000;
  437. // log.info("\r\n{}\r\n {} 秒自动关闭推拉流 \r\n", camera.getUrl(), noClientsDuration-noClient);
  438. }
  439. } else {
  440. // 重置计时
  441. noClient = 0;
  442. }
  443. }
  444. /**
  445. * 发送帧数据
  446. *
  447. * @param data
  448. */
  449. private void sendFrameData(byte[] data) {
  450. // ws
  451. for (Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
  452. try {
  453. if (entry.getValue().channel().isWritable()) {
  454. entry.getValue().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(data)));
  455. } else {
  456. wsClients.remove(entry.getKey());
  457. hasClient();
  458. }
  459. } catch (java.lang.Exception e) {
  460. wsClients.remove(entry.getKey());
  461. hasClient();
  462. e.printStackTrace();
  463. }
  464. }
  465. // http
  466. for (Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
  467. try {
  468. if (entry.getValue().channel().isWritable()) {
  469. entry.getValue().writeAndFlush(Unpooled.copiedBuffer(data));
  470. } else {
  471. httpClients.remove(entry.getKey());
  472. hasClient();
  473. }
  474. } catch (java.lang.Exception e) {
  475. httpClients.remove(entry.getKey());
  476. hasClient();
  477. e.printStackTrace();
  478. }
  479. }
  480. }
  481. /**
  482. * 新增客户端
  483. *
  484. * @param ctx netty client
  485. * @param ctype enum,ClientType
  486. */
  487. public void addClient(ChannelHandlerContext ctx, ClientType ctype) {
  488. int timeout = 0;
  489. while (true) {
  490. try {
  491. if (header != null) {
  492. try {
  493. if (ctx.channel().isWritable()) {
  494. // 发送帧前先发送header
  495. if (ClientType.HTTP.getType() == ctype.getType()) {
  496. ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer(header));
  497. future.addListener(new GenericFutureListener<Future<? super Void>>() {
  498. @Override
  499. public void operationComplete(Future<? super Void> future) throws Exception {
  500. if (future.isSuccess()) {
  501. httpClients.put(ctx.channel().id().toString(), ctx);
  502. }
  503. }
  504. });
  505. } else if (ClientType.WEBSOCKET.getType() == ctype.getType()) {
  506. ChannelFuture future = ctx
  507. .writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(header)));
  508. future.addListener(new GenericFutureListener<Future<? super Void>>() {
  509. @Override
  510. public void operationComplete(Future<? super Void> future) throws Exception {
  511. if (future.isSuccess()) {
  512. wsClients.put(ctx.channel().id().toString(), ctx);
  513. }
  514. }
  515. });
  516. }
  517. }
  518. } catch (java.lang.Exception e) {
  519. e.printStackTrace();
  520. }
  521. break;
  522. }
  523. // 等待推拉流启动
  524. Thread.sleep(50);
  525. // 启动录制器失败
  526. timeout += 50;
  527. if (timeout > 30000) {
  528. break;
  529. }
  530. } catch (java.lang.Exception e) {
  531. e.printStackTrace();
  532. }
  533. }
  534. }
  535. public static void main(String[] args) throws Exception {
  536. // ServerSocket serverSocket = new ServerSocket(0, 1, InetAddress.getLoopbackAddress());
  537. // System.out.println(serverSocket.getLocalPort());
  538. // System.out.println(serverSocket.getInetAddress().getHostAddress());
  539. MediaTransferFlvByFFmpeg.atPath().addArgument("-i")
  540. .addArgument("rtsp://admin:VZCDOY@192.168.2.84:554/Streaming/Channels/102").addArgument("-g")
  541. .addArgument("5").addArgument("-c:v").addArgument("libx264").addArgument("-c:a").addArgument("aac")
  542. .addArgument("-f").addArgument("flv").execute();
  543. }
  544. }