MediaTransferFlvByFFmpeg.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631
  1. package com.zj.thread;
  2. import java.io.BufferedReader;
  3. import java.io.ByteArrayOutputStream;
  4. import java.io.DataInputStream;
  5. import java.io.IOException;
  6. import java.io.InputStreamReader;
  7. import java.net.InetAddress;
  8. import java.net.ServerSocket;
  9. import java.net.Socket;
  10. import java.net.SocketTimeoutException;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. import java.util.Map.Entry;
  14. import java.util.concurrent.ConcurrentHashMap;
  15. import org.bytedeco.javacv.FrameGrabber.Exception;
  16. import com.zj.common.ClientType;
  17. import com.zj.common.MediaConstant;
  18. import com.zj.dto.CameraDto;
  19. import com.zj.service.MediaService;
  20. import cn.hutool.core.collection.CollUtil;
  21. import io.netty.buffer.Unpooled;
  22. import io.netty.channel.ChannelFuture;
  23. import io.netty.channel.ChannelHandlerContext;
  24. import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
  25. import io.netty.util.concurrent.Future;
  26. import io.netty.util.concurrent.GenericFutureListener;
  27. import lombok.extern.slf4j.Slf4j;
  28. /**
  29. *
  30. * 使用ffmpeg推拉流,可以说无敌了
  31. *
  32. * 优点:支持各种杂七杂八的流,兼容性比较好,稳定,不容易出错,自身带有重连机制,可以自己使用命令封装 缺点:系统会存在多个ffmpeg进程,
  33. * 无法直接操作帧,延迟优化没javacv方便
  34. *
  35. * @author ZJ
  36. *
  37. */
  38. @Slf4j
  39. public class MediaTransferFlvByFFmpeg extends MediaTransfer {
  40. /**
  41. * ws客户端
  42. */
  43. private ConcurrentHashMap<String, ChannelHandlerContext> wsClients = new ConcurrentHashMap<>();
  44. /**
  45. * http客户端
  46. */
  47. private ConcurrentHashMap<String, ChannelHandlerContext> httpClients = new ConcurrentHashMap<>();
  48. /**
  49. * flv header
  50. */
  51. private byte[] header = null;
  52. /**
  53. * 相机
  54. */
  55. private CameraDto cameraDto;
  56. private List<String> command = new ArrayList<>();
  57. private ServerSocket tcpServer = null;
  58. private Process process;
  59. private Thread inputThread;
  60. private Thread errThread;
  61. private Thread outputThread;
  62. private Thread listenThread;
  63. private boolean running = false; // 启动
  64. private boolean enableLog = true;
  65. private int hcSize, wcSize = 0;
  66. // 记录当前
  67. long currentTimeMillis = System.currentTimeMillis();
  68. /**
  69. * 用于没有客户端时候的计时
  70. */
  71. private int noClient = 0;
  72. public MediaTransferFlvByFFmpeg(final String executable) {
  73. command.add(executable);
  74. buildCommand();
  75. }
  76. public MediaTransferFlvByFFmpeg(CameraDto cameraDto) {
  77. command.add(System.getProperty(MediaConstant.ffmpegPathKey));
  78. this.cameraDto = cameraDto;
  79. buildCommand();
  80. }
  81. public MediaTransferFlvByFFmpeg(final String executable, CameraDto cameraDto) {
  82. command.add(executable);
  83. this.cameraDto = cameraDto;
  84. buildCommand();
  85. }
  86. public MediaTransferFlvByFFmpeg(final String executable, CameraDto cameraDto, boolean enableLog) {
  87. command.add(executable);
  88. this.cameraDto = cameraDto;
  89. this.enableLog = enableLog;
  90. buildCommand();
  91. }
  92. public boolean isEnableLog() {
  93. return enableLog;
  94. }
  95. public void setEnableLog(boolean enableLog) {
  96. this.enableLog = enableLog;
  97. }
  98. public boolean isRunning() {
  99. return running;
  100. }
  101. public void setRunning(boolean running) {
  102. this.running = running;
  103. }
  104. private MediaTransferFlvByFFmpeg addArgument(String argument) {
  105. command.add(argument);
  106. return this;
  107. }
  108. /**
  109. * 构建ffmpeg转码命令,新版javacv移除libx264,使用libopenh264
  110. * 查看显卡硬件加速支持的选项ffmpeg -hwaccels
  111. * 查看ffmpeg支持选项
  112. * linux:ffmpeg -codecs | grep cuvid,
  113. * window:ffmpeg -codecs | findstr cuvid
  114. * h264_nvenc
  115. * ffmpeg -hwaccel cuvid -c:v h264_cuvid -rtsp_transport tcp -i "rtsp地址" -c:v h264_nvenc -b:v 500k -vf scale_npp=1280:-1 -y /home/2.mp4
  116. *
  117. * -hwaccel cuvid:指定使用cuvid硬件加速
  118. * -c:v h264_cuvid:使用h264_cuvid进行视频解码
  119. * -c:v h264_nvenc:使用h264_nvenc进行视频编码
  120. * -vf scale_npp=1280:-1:指定输出视频的宽高,注意,这里和软解码时使用的-vf scale=x:x不一样
  121. *
  122. * 转码期间nvidia-smi查看显卡状态
  123. * -hwaccel_device N 指定某颗GPU执行转码任务
  124. */
  125. private void buildCommand() {
  126. this.addArgument("-rtsp_transport").addArgument("tcp").addArgument("-i").addArgument(cameraDto.getUrl())
  127. .addArgument("-max_delay").addArgument("1")
  128. // .addArgument("-strict").addArgument("experimental")
  129. .addArgument("-g").addArgument("25").addArgument("-r").addArgument("25")
  130. // .addArgument("-b").addArgument("200000")
  131. // .addArgument("-filter_complex").addArgument("setpts='(RTCTIME - RTCSTART) / (TB * 1000000)'")
  132. .addArgument("-c:v").addArgument("libopenh264").addArgument("-preset:v").addArgument("ultrafast")
  133. // .addArgument("-preset:v").addArgument("fast")
  134. .addArgument("-tune:v").addArgument("zerolatency")
  135. // .addArgument("-crf").addArgument("26")
  136. .addArgument("-c:a").addArgument("aac")
  137. // .addArgument("-qmin").addArgument("28")
  138. // .addArgument("-qmax").addArgument("32")
  139. // .addArgument("-b:v").addArgument("448k")
  140. // .addArgument("-b:a").addArgument("64k")
  141. .addArgument("-f").addArgument("flv");
  142. }
  143. /**
  144. * 转封装命令
  145. */
  146. private void buildCopyCommand() {
  147. this.addArgument("-rtsp_transport").addArgument("tcp").addArgument("-i").addArgument(cameraDto.getUrl())
  148. .addArgument("-max_delay").addArgument("1")
  149. .addArgument("-g").addArgument("25").addArgument("-r").addArgument("25")
  150. .addArgument("-c:v").addArgument("copy")
  151. .addArgument("-c:a").addArgument("copy")
  152. .addArgument("-f").addArgument("flv");
  153. }
  154. // private void buildCommand() {
  155. // this
  156. //// .addArgument("-rtsp_transport").addArgument("tcp")
  157. // .addArgument("-i").addArgument(camera.getUrl())
  158. // .addArgument("-max_delay").addArgument("100")
  159. //// .addArgument("-strict").addArgument("experimental")
  160. // .addArgument("-g").addArgument("10")
  161. //// .addArgument("-r").addArgument("25")
  162. //// .addArgument("-b").addArgument("200000")
  163. //// .addArgument("-filter_complex").addArgument("setpts='(RTCTIME - RTCSTART) / (TB * 1000000)'")
  164. // .addArgument("-c:v").addArgument("libx264")
  165. // .addArgument("-preset:v").addArgument("ultrafast")
  166. // .addArgument("-tune:v").addArgument("zerolatency")
  167. //// .addArgument("-crf").addArgument("26")
  168. // .addArgument("-c:a").addArgument("aac")
  169. // .addArgument("-qmin").addArgument("28")
  170. // .addArgument("-qmax").addArgument("32")
  171. // .addArgument("-b:v").addArgument("448k")
  172. // .addArgument("-b:a").addArgument("64k")
  173. // .addArgument("-f").addArgument("flv");
  174. // }
  175. /**
  176. * 执行推流
  177. *
  178. * @return
  179. */
  180. public MediaTransferFlvByFFmpeg execute() {
  181. String output = getOutput();
  182. command.add(output);
  183. String join = CollUtil.join(command, " ");
  184. log.info(join);
  185. try {
  186. process = new ProcessBuilder(command).start();
  187. running = true;
  188. listenNetTimeout();
  189. dealStream(process);
  190. outputData();
  191. listenClient();
  192. } catch (IOException e) {
  193. e.printStackTrace();
  194. }
  195. return this;
  196. }
  197. /**
  198. * flv数据
  199. */
  200. private void outputData() {
  201. outputThread = new Thread(new Runnable() {
  202. public void run() {
  203. Socket client = null;
  204. try {
  205. client = tcpServer.accept();
  206. DataInputStream input = new DataInputStream(client.getInputStream());
  207. byte[] buffer = new byte[1024];
  208. int len = 0;
  209. ByteArrayOutputStream bos = new ByteArrayOutputStream();
  210. while (running) {
  211. len = input.read(buffer);
  212. if (len == -1) {
  213. break;
  214. }
  215. bos.write(buffer, 0, len);
  216. if (header == null) {
  217. header = bos.toByteArray();
  218. // System.out.println(HexUtil.encodeHexStr(header));
  219. bos.reset();
  220. continue;
  221. }
  222. // 帧数据
  223. byte[] data = bos.toByteArray();
  224. bos.reset();
  225. // 发送到前端
  226. sendFrameData(data);
  227. }
  228. try {
  229. client.close();
  230. } catch (java.lang.Exception e) {
  231. }
  232. try {
  233. input.close();
  234. } catch (java.lang.Exception e) {
  235. }
  236. try {
  237. bos.close();
  238. } catch (java.lang.Exception e) {
  239. }
  240. log.info("关闭媒体流-ffmpeg,{} ", cameraDto.getUrl());
  241. } catch (SocketTimeoutException e1) {
  242. // e1.printStackTrace();
  243. // 超时关闭
  244. } catch (IOException e) {
  245. // e.printStackTrace();
  246. } finally {
  247. MediaService.cameras.remove(cameraDto.getMediaKey());
  248. running = false;
  249. process.destroy();
  250. try {
  251. if (null != client) {
  252. client.close();
  253. }
  254. } catch (IOException e) {
  255. }
  256. try {
  257. if (null != tcpServer) {
  258. tcpServer.close();
  259. }
  260. } catch (IOException e) {
  261. }
  262. }
  263. }
  264. });
  265. outputThread.start();
  266. }
  267. /**
  268. * 监听客户端
  269. */
  270. public void listenClient() {
  271. listenThread = new Thread(new Runnable() {
  272. public void run() {
  273. while (running) {
  274. hasClient();
  275. try {
  276. Thread.sleep(1000);
  277. } catch (InterruptedException e) {
  278. }
  279. }
  280. }
  281. });
  282. listenThread.start();
  283. }
  284. /**
  285. * 监听网络异常超时
  286. */
  287. public void listenNetTimeout() {
  288. Thread listenNetTimeoutThread = new Thread(new Runnable() {
  289. public void run() {
  290. while (true) {
  291. if ((System.currentTimeMillis() - currentTimeMillis) > 15000) {
  292. log.info("网络异常超时");
  293. MediaService.cameras.remove(cameraDto.getMediaKey());
  294. stopFFmpeg();
  295. break;
  296. }
  297. try {
  298. Thread.sleep(5000);
  299. } catch (InterruptedException e) {
  300. }
  301. }
  302. }
  303. });
  304. listenNetTimeoutThread.setDaemon(true);
  305. listenNetTimeoutThread.start();
  306. }
  307. public static MediaTransferFlvByFFmpeg atPath() {
  308. return atPath(null);
  309. }
  310. public static MediaTransferFlvByFFmpeg atPath(final String absPath) {
  311. final String executable;
  312. if (absPath != null) {
  313. executable = absPath;
  314. } else {
  315. // executable = "ffmpeg";
  316. executable = System.getProperty(MediaConstant.ffmpegPathKey);
  317. }
  318. return new MediaTransferFlvByFFmpeg(executable);
  319. }
  320. /**
  321. * 控制台输出
  322. *
  323. * @param process
  324. */
  325. private void dealStream(Process process) {
  326. if (process == null) {
  327. return;
  328. }
  329. // 处理InputStream的线程
  330. inputThread = new Thread() {
  331. @Override
  332. public void run() {
  333. BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()));
  334. String line = null;
  335. try {
  336. while (running) {
  337. line = in.readLine();
  338. currentTimeMillis = System.currentTimeMillis();
  339. if (line == null) {
  340. break;
  341. }
  342. if (enableLog) {
  343. log.info("output: " + line);
  344. }
  345. }
  346. } catch (IOException e) {
  347. e.printStackTrace();
  348. } finally {
  349. try {
  350. running = false;
  351. in.close();
  352. } catch (IOException e) {
  353. e.printStackTrace();
  354. }
  355. }
  356. }
  357. };
  358. // 处理ErrorStream的线程
  359. errThread = new Thread() {
  360. @Override
  361. public void run() {
  362. BufferedReader err = new BufferedReader(new InputStreamReader(process.getErrorStream()));
  363. String line = null;
  364. try {
  365. while (running) {
  366. line = err.readLine();
  367. currentTimeMillis = System.currentTimeMillis();
  368. if (line == null) {
  369. break;
  370. }
  371. if (enableLog) {
  372. log.info("err: " + line);
  373. }
  374. }
  375. } catch (IOException e) {
  376. e.printStackTrace();
  377. } finally {
  378. try {
  379. running = false;
  380. err.close();
  381. } catch (IOException e) {
  382. e.printStackTrace();
  383. }
  384. }
  385. }
  386. };
  387. inputThread.start();
  388. errThread.start();
  389. }
  390. /**
  391. * 输出到tcp
  392. *
  393. * @return
  394. */
  395. private String getOutput() {
  396. try {
  397. tcpServer = new ServerSocket(0, 1, InetAddress.getLoopbackAddress());
  398. StringBuffer sb = new StringBuffer();
  399. sb.append("tcp://");
  400. sb.append(tcpServer.getInetAddress().getHostAddress());
  401. sb.append(":");
  402. sb.append(tcpServer.getLocalPort());
  403. tcpServer.setSoTimeout(10000);
  404. return sb.toString();
  405. } catch (IOException e) {
  406. e.printStackTrace();
  407. }
  408. new RuntimeException("无法启用端口");
  409. return "";
  410. }
  411. /**
  412. * 关闭
  413. */
  414. public void stopFFmpeg() {
  415. this.running = false;
  416. try {
  417. this.process.destroy();
  418. log.info("关闭媒体流-ffmpeg,{} ", cameraDto.getUrl());
  419. } catch (java.lang.Exception e) {
  420. process.destroyForcibly();
  421. }
  422. // 媒体异常时,主动断开前端长连接
  423. for (Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
  424. try {
  425. entry.getValue().close();
  426. } catch (java.lang.Exception e) {
  427. } finally {
  428. wsClients.remove(entry.getKey());
  429. }
  430. }
  431. for (Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
  432. try {
  433. entry.getValue().close();
  434. } catch (java.lang.Exception e) {
  435. } finally {
  436. httpClients.remove(entry.getKey());
  437. }
  438. }
  439. }
  440. /**
  441. * 关闭流
  442. *
  443. * @return
  444. */
  445. public void hasClient() {
  446. int newHcSize = httpClients.size();
  447. int newWcSize = wsClients.size();
  448. if (hcSize != newHcSize || wcSize != newWcSize) {
  449. hcSize = newHcSize;
  450. wcSize = newWcSize;
  451. log.info("\r\n{}\r\nhttp连接数:{}, ws连接数:{} \r\n", cameraDto.getUrl(), newHcSize, newWcSize);
  452. }
  453. // 无需自动关闭
  454. if (!cameraDto.isAutoClose()) {
  455. return;
  456. }
  457. if (httpClients.isEmpty() && wsClients.isEmpty()) {
  458. // 等待20秒还没有客户端,则关闭推流
  459. if (noClient > cameraDto.getNoClientsDuration()) {
  460. running = false;
  461. MediaService.cameras.remove(cameraDto.getMediaKey());
  462. } else {
  463. noClient += 1000;
  464. // log.info("\r\n{}\r\n {} 秒自动关闭推拉流 \r\n", camera.getUrl(), noClientsDuration-noClient);
  465. }
  466. } else {
  467. // 重置计时
  468. noClient = 0;
  469. }
  470. }
  471. /**
  472. * 发送帧数据
  473. *
  474. * @param data
  475. */
  476. private void sendFrameData(byte[] data) {
  477. // ws
  478. for (Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
  479. try {
  480. if (entry.getValue().channel().isWritable()) {
  481. entry.getValue().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(data)));
  482. } else {
  483. wsClients.remove(entry.getKey());
  484. hasClient();
  485. }
  486. } catch (java.lang.Exception e) {
  487. wsClients.remove(entry.getKey());
  488. hasClient();
  489. e.printStackTrace();
  490. }
  491. }
  492. // http
  493. for (Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
  494. try {
  495. if (entry.getValue().channel().isWritable()) {
  496. entry.getValue().writeAndFlush(Unpooled.copiedBuffer(data));
  497. } else {
  498. httpClients.remove(entry.getKey());
  499. hasClient();
  500. }
  501. } catch (java.lang.Exception e) {
  502. httpClients.remove(entry.getKey());
  503. hasClient();
  504. e.printStackTrace();
  505. }
  506. }
  507. }
  508. /**
  509. * 新增客户端
  510. *
  511. * @param ctx netty client
  512. * @param ctype enum,ClientType
  513. */
  514. public void addClient(ChannelHandlerContext ctx, ClientType ctype) {
  515. int timeout = 0;
  516. while (true) {
  517. try {
  518. if (header != null) {
  519. try {
  520. if (ctx.channel().isWritable()) {
  521. // 发送帧前先发送header
  522. if (ClientType.HTTP.getType() == ctype.getType()) {
  523. ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer(header));
  524. future.addListener(new GenericFutureListener<Future<? super Void>>() {
  525. @Override
  526. public void operationComplete(Future<? super Void> future) throws Exception {
  527. if (future.isSuccess()) {
  528. httpClients.put(ctx.channel().id().toString(), ctx);
  529. }
  530. }
  531. });
  532. } else if (ClientType.WEBSOCKET.getType() == ctype.getType()) {
  533. ChannelFuture future = ctx
  534. .writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(header)));
  535. future.addListener(new GenericFutureListener<Future<? super Void>>() {
  536. @Override
  537. public void operationComplete(Future<? super Void> future) throws Exception {
  538. if (future.isSuccess()) {
  539. wsClients.put(ctx.channel().id().toString(), ctx);
  540. }
  541. }
  542. });
  543. }
  544. }
  545. } catch (java.lang.Exception e) {
  546. e.printStackTrace();
  547. }
  548. break;
  549. }
  550. // 等待推拉流启动
  551. Thread.sleep(50);
  552. // 启动录制器失败
  553. timeout += 50;
  554. if (timeout > 30000) {
  555. break;
  556. }
  557. } catch (java.lang.Exception e) {
  558. e.printStackTrace();
  559. }
  560. }
  561. }
  562. public static void main(String[] args) throws Exception {
  563. // ServerSocket serverSocket = new ServerSocket(0, 1, InetAddress.getLoopbackAddress());
  564. // System.out.println(serverSocket.getLocalPort());
  565. // System.out.println(serverSocket.getInetAddress().getHostAddress());
  566. MediaTransferFlvByFFmpeg.atPath().addArgument("-i")
  567. .addArgument("rtsp://admin:VZCDOY@192.168.2.84:554/Streaming/Channels/102").addArgument("-g")
  568. .addArgument("5").addArgument("-c:v").addArgument("libx264").addArgument("-c:a").addArgument("aac")
  569. .addArgument("-f").addArgument("flv").execute();
  570. }
  571. }