jrtplib

作用

  • jrtplib是一个基于C++编写的面向对象的库,旨在帮助开发人员使用RFC3550中描述的实时传输协议(RTP),目前已经可以运行在Windows、Linux、FreeBSD、 Solaris、Unix和VxWorks等多种操作系统上。
  • 该库使用户能够发送和接收数据使用RTP,无需担心SSRC冲突、调度和传输RTCP数据等。用户只需提供库通过发送有效负载数据,库为用户提供访问权限输入RTP和RTCP数据。
  • 该库提供了几个类,这些类有助于创建RTP应用程序。大多数用户可能只需要RTPSession类来构建应用程序,或者从RTPSecureSession派生一个类来支持SRTP。这些类提供了发送RTP数据的必要功能,并在内部处理RTCP部分。

依赖

  • 依赖: JThread
    • 第一种是用 jthread 库提供的线程自动在后台执行对数据的接收。第二种是用户自己调用 RTPSession 中的 Poll 方法。如果采取第一种方法则要安装 jthread 库
    • 在jrtplib的configure中,会查找系统是否有编译了jthread库,如果有,那么编译的jrtp库会开启对jthread的支持。。

JRTPLIB 2.x系列和3.x系列

网上一般有JRTPLIB 2.x系列和3.x系列两种版本:

简单来说:

  • 2.x系列代码量少使用简单,但是只支持RFC 1889不支持RFC 3550()
  • 3.x支持RFC 3550,但代码量稍多,以及使用也稍显复杂。

详细的说:

  • 最重要的变化之一可能是,3.x版本基于RFC 3550,2x版本基于已经过时的RFC 1889
  • 此外,创建2.x系列的想法是,用户只需要使用RTPSession类,这意味着其他类本身不是很有用。另一方面,该版本旨在提供许多有用的组件,以帮助用户构建支持RTP的应用程序。
  • 在3.x版本中,特定于传输RTP数据包的底层协议的代码捆绑在一个类中,该类从名为RTPTTransmiter的类继承其接口。这使得不同的底层应用程序变得容易需要支持的协议。目前支持IPv4上的UDP和IPv6上的UDP。
  • 对于诸如混音器或转换器之类的应用程序,使用RTPSession类并不是一个好的解决方案。其他组件也可以用于此目的:传输组件、SSRC表、RTCP调度程序等。使用这些组件,构建各种应用程序应该更容易。

编译

下载

首先从JRTPLIB的网站来使用下载最新的源码包,我现在的是jrtplib-3.11.2.zip

在这里插入图片描述
请注意,此版本至少需要JThread 1.3.0
在这里插入图片描述
我下载的是jthread-1.3.3.zip

编译

方法一(推荐)

工程目录如下

在这里插入图片描述

  • include:自己创建的,存放自己写的头文件
  • src:自己创建的,存放自己写的源文件,稍后在下文将会说明
  • lib:自己创建的,存放当前这个工程的所有依赖
    • jthread-1.3.3,是刚刚jthread-1.3.3.zip的解压
    • jrtplib-3.11.2,是刚刚jrtplib-3.11.2.zip 的解压
    • export:存放编译脚本和生成的依赖头文件和库文件。当前写了两个脚本
      • build_jthread.sh:用来编译jthread
      • build_jrtp.sh:用来编译jrtplib

build_jthread.sh

其内容为:

#!/bin/bash
currentPath=$(pwd)

libPath=$(pwd)/../jthread-1.3.3

if [ -d "./jthread" ]; then
    rm -rf jthread
fi
mkdir jthread
cd jthread
installPath=$(pwd)


cd ${libPath}
if [ -d "./build" ]; then
    rm -rf build
fi
mkdir build
cd build

cmake -DCMAKE_INSTALL_PREFIX=${installPath} ..
make
make install

cd ..

build_jrtp.sh

其内容为:

#!/bin/bash
currentPath=$(pwd)

libPath=$(pwd)/../jrtplib-3.11.2


if [ -d "./jrtplib" ]; then
    rm -rf jrtplib
fi
mkdir jrtplib
cd jrtplib
installPath=$(pwd)


cd ${libPath}
if [ -d "./build" ]; then
    rm -rf build
fi
mkdir build
cd build

cmake -DCMAKE_INSTALL_PREFIX=${installPath} ..
make
make install

cd ..


修改jrtplib-3.11.2中的CMakeLists.txt文件第37行,加上路径。

find_package(JThread PATHS  /home/oceanstar/CLionProjects/jrtp_test/lib/export/jthread)

开始编译

cd  export
chmod 777 *.sh
./build_jthread.sh
./build_jrtp.sh

在这里插入图片描述
置于对应的makefile怎么写,可以看
在这里插入图片描述

方法二(不推荐)

编译JTHREAD

将下载的压缩包解压后进入jthread-1.3.3目录中

cd jthread-1.3.3
mkdir build
cd build
cmake ../ -DCMAKE_INSTALL_PREFIX=../../export 
make 
make install   

在这里插入图片描述

我们来看下应该怎么引用生成的库和头。 在build下有个pkgconfig,下面生成了一个jthread.pc

在这里插入图片描述

编译jrtplib

将下载的源码解压缩

cd jrtplib-3.11.2
mkdir build
cd build
cmake ../ -DCMAKE_INSTALL_PREFIX=../../export
make && make install

在这里插入图片描述

使用

工程结构

在这里插入图片描述

最外层CMakeLists.txt

cmake_minimum_required(VERSION 3.16)
project(jrtp_test)

set(CMAKE_CXX_STANDARD 14)
add_subdirectory (src)

src目录下的CMakeLists.txt

include_directories(
        ${CMAKE_SOURCE_DIR}/include
        ${CMAKE_SOURCE_DIR}/lib/export/jrtplib/include/jrtplib3
        ${CMAKE_SOURCE_DIR}/lib/export/jthread/include/jthread
)

link_directories(
        ${CMAKE_SOURCE_DIR}/lib/export/jrtplib/lib
        ${CMAKE_SOURCE_DIR}/lib/export/jthread/lib
)

add_definitions("-Wall -g")
aux_source_directory(. SRC_LIST)
add_executable(${PROJECT_NAME}  ${SRC_LIST} )
set (EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)


target_link_libraries( ${PROJECT_NAME}  -ljrtp -ljthread -lpthread )




实例一

必须先进入命名空间:

using namespace jrtplib;

初始化

(1)创建一个RTPSession对象:
在使用jrtplib进行实时流媒体数据传输之前,首先应该生成RTPSession 类的一个实例来表示此次RTP会话,类似于全局上下文句柄

RTPSession rtpSession;

(2)然后调用 Create() 方法来对初始化一个会话。

  • 要真正创建会话,必须调用create成员函数,该函数接受三个参数:
    • 第一个参数是RTPSessionParams类型,用于指定会话的常规选项。
      • 必须显式设置[此类的要发送的数据的时间戳单位,即调用SetOwnTimestampUnit],否则将无法成功创建会话
      • 其他会话参数可能取决于您打算使用的实际RTP配置文件。
    • 第二个参数是指向RTPTransimissionParams实例的指针,并描述传输组件的参数
      • 可以SetPortbase指定接收数据的端口,注意端口不能是奇数,否者运行时会出现错误:
    • 第三个参数选择要使用的传输组件的类型。默认情况下,使用UDP / IPv4传输器,对于这个特定的传输器,传输参数应该是RTPUDPv4TransmissionParams类型。
      (3)处理返回值:
  • 如果RTP会话创建过程失败,Create()方法将会返回一个负数,通过它虽然可以很容易地判断出函数调用究竟是成功的还是失败的,但却很难明白出错的原因到底什么。
  • JRTPLIB采用了统一的错误处理机制,它提供的所有函数如果返回负数就表明出现了某种形式的错误,而具体的出错信息则可以通过调用 RTPGetErrorString()函数得到。RTPGetErrorString()函数将错误代码作为参数传入,然后返回该错误代码所对应的错误信息。
#include <stdio.h>
#include "rtpsessionparams.h"
#include "rtpudpv4transmitter.h"
#include "rtpsession.h"

using namespace jrtplib;



int main(void)
{
    double	tsunit;
    int ListenPort;
    RTPSession rtpSession;
    RTPSessionParams           SessParams;
    RTPUDPv4TransmissionParams TransParams;


    tsunit = 1.0/8000.0;                       /* 1/8000表示1秒钟采样8000次,即录音时的8KHz*/
    SessParams.SetOwnTimestampUnit(tsunit);    // 时间戳:1秒钟8000个样本
    SessParams.SetAcceptOwnPackets(true);  // 设置是否接收属于本身的数据,true-接收,false-不接收

    ListenPort = 5440;
    TransParams.SetPortbase(ListenPort);   // 设置本地接收的端口号
    int iErrNum = rtpSession.Create(SessParams, &TransParams);
    std::string RtpError = RTPGetErrorString(iErrNum);
    if (iErrNum < 0){
        printf( "Create RTP Session error! Reason: %s!\r\n", RtpError.c_str() );
        return -1;
    }
    printf( "Create RTP Session OK! Reason: %s!\r\n", RtpError.c_str() );

    return 0;
}

在这里插入图片描述

数据发送

当RTP会话成功建立起来之后,接下来就可以进行流媒体数据的实时传输了。

(1)首先要设置RTP和RTCP数据应该发送到哪个目的地

  • RTP协议允许同一会话存在多个目标地址,这可以通过调用RTPSession类的AddDestination()、 DeleteDestination()和ClearDestinations()方法来完成。

  • 此函数接受RTPAddress类型的参数。这是一个抽象类,对于IPv4上的UDP发送器,实际使用的类是RTPIPv4Address。

  • 例如,下面的语句表示的是让RTP会话将数据发送到本地主机的6000端口:

    char destIp [16] = "127.0.0.1";
    int destPort = 10000;
    RTPIPv4Address addr(ntohl(inet_addr(destIp)), destPort);
    iErrNum = rtpSession.AddDestination(addr);
    if (iErrNum < 0)
    {
        printf( "rtpSession AddDestination error! Reason: %s!\r\n",  RTPGetErrorString(iErrNum).c_str() );
        exit(-1);
    }
    printf( "rtpSession AddDestination ok! Reason: %s!\r\n",  RTPGetErrorString(iErrNum).c_str() );

(2)目标地址全部指定之后,接着就可以调用 RTPSession 类的 SendPacket() 方法,向所有的目标地址发送流媒体数据。

	int SendPacket(const void *data,size_t len);
	int SendPacket(const void *data,size_t len,
	                uint8_t pt,bool mark,uint32_t timestampinc);
  • SendPacket()最典型的用法是类似于下面的语句,其中第一个参数是要被发送的数据,而第二个参数则指明将要发送数据的长度,再往后依次是RTP负载类型、标识和时戳增量。
sess.SendPacket(buffer, 5, 0, false, 10);
  • 对于同一个 RTP 会话来讲,负载类型、标识和时戳增量通常来讲都是相同的,JRTPLIB 允许将它们设置为会话的默认参数,这是通过调用 RTPSession 类的 SetDefaultPayloadType()、SetDefaultMark() 和SetDefaultTimeStampIncrement() 方法来完成的。
    session.SetDefaultPayloadType(96);    //希望使用负载类型96
    session.SetDefaultMark(false);  //想要指出何时收到了来自其他人的数据包
    session.SetDefaultTimestampIncrement(160);  //假设在一分钟的时间内,我们想要发送包含20毫秒(或160个样本)的数据包
  • 之后在进行数据发送时只需指明要发送的数据及其长度就可以了:
    sess.SendPacket(buffer, 5);

接收数据

  • 如果库是在JThread支持下编译的,那么传入的数据将在后台处理。
  • 如果在编译时没有启用JThread支持,或者在会话参数中指定不应该使用轮询线程,那么必须定期调用RTPSession成员函数poll来处理传入数据,并在必要时发送RTCP数据。

方法1: 自己使用PollData方法来接收发送过来的RTP或者 RTCP数据报。(不推荐)

在调用session.Poll()函数时,程序报错This function is not available when using the RTP poll thread feature。

  • 网上搜索说是防火墙问题,导致数据未收到,经检查我的防火墙一直是关着的,跟这个应该没关系。
  • 注意到jrtplib源码中的example1示例,其中当需要调用Poll函数(对于流媒体数据的接收端,首先需要调用 RTPSession 类的 PollData() 方法来接收发送过来的 RTP 或者RTCP 数据报。JRTPLIB-3.7中修改PollData()方法为Poll(),使用都一样)时,会有个宏定义,最后发现应该是与JThread这个库有关,当你使用这个库的时候,就不需要你自己去主动调用Poll函数,而是JThread会帮你做。所以如果不使用JThread那么,就需要使用Poll函数,可以测试下不安装JThread的情况下是否就不会报错了。而我这边是安装并使用JThread的,所以代码中不再需要自己手动去Poll,所以不调用该函数即可!

方法2:传入的数据将在后台处理。

  • 在调用RTPSession成员函数RTPSession::BeginDataAccess和RTPSession::EndDataAccess之间,可以完成有关会话参与者、数据包检索等的信息。这可以确保后台线程不会试图更改您试图访问的数据。
  • 由于同一个 RTP 会话中允许有多个参与者(源),你既可以通过调用 RTPSession 类的GotoFirstSource() 和 GotoNextSource() 方法来遍历所有的源,也可以通过调用 RTPSession 类的GotoFirstSourceWithData() 和 GotoNextSourceWithData() 方法来遍历那些携带有数据的源。
  • 在从 RTP 会话中检测出有效的数据源之后,接下去就可以调用 RTPSession 类的 GetNextPacket() 方法从中抽取 RTP 数据报,有的话返回非NULL,获取数据长度和收到的数据,可对数据进行处理,当接收到的 RTP 数据报处理完之后,一定要记得需要调用DeletePacket及时释放。
    // 开始接收数据
    rtpSession.BeginDataAccess();
    if (rtpSession.GotoFirstSource())
    {
        do
        {
            RTPPacket *packet;
            while ((packet = rtpSession.GetNextPacket()) != 0)
            {

                // 获取接收数据长度
                unsigned int recvSize = packet->GetPayloadLength();
                // 获取接收数据
                unsigned char * recvData = (unsigned char *)packet->GetPayloadData();
                std::cout << "Got packet with extended sequence number "
                          << packet->GetExtendedSequenceNumber()
                          << " from SSRC " << packet->GetSSRC()  << "; recvSize " << recvSize << "[" << recvData << "]"
                          << std::endl;
                // 删除数据包
                rtpSession.DeletePacket(packet);
            }
        } while (rtpSession.GotoNextSource());
    }
    rtpSession.EndDataAccess();

  • JRTPLIB 为 RTP 数据报定义了三种接收模式,其中每种接收模式都具体规定了哪些到达的 RTP 数据报将会被接受,而哪些到达的 RTP 数据报将会被拒绝。通过调用 RTPSession 类的 SetReceiveMode() 方法可以设置下列这些接收模式:
    • RECEIVEMODE_ALL  缺省的接收模式,所有到达的 RTP 数据报都将被接受;
    • RECEIVEMODE_IGNORESOME  除了某些特定的发送者之外,所有到达的 RTP 数据报都将被接受,而被拒绝的发送者列表可以通过调用 AddToIgnoreList()、DeleteFromIgnoreList() 和 ClearIgnoreList() 方法来进行设置;
    • RECEIVEMODE_ACCEPTSOME  除了某些特定的发送者之外,所有到达的 RTP 数据报都将被拒绝,而被接受的发送者列表可以通过调用 AddToAcceptList ()、DeleteFromAcceptList 和 ClearAcceptList () 方法来进行设置。 下面是采用第三种接收模式的程序示例。
 rtpSession.BeginDataAccess();
    if (rtpSession.GotoFirstSourceWithData()) {
        do {
            //rtpSession.AddToAcceptList(&addresss);
            rtpSession.SetReceiveMode(RTPTransmitter::ReceiveMode::AcceptAll);

            RTPPacket *pack;
            pack = rtpSession.GetNextPacket();            // 处理接收到的数据    
            delete pack;   }
        while (rtpSession.GotoNextSourceWithData());
    }
    rtpSession.EndDataAccess();

控制信息

  • JRTPLIB 是一个高度封装后的RTP库,程序员在使用它时很多时候并不用关心RTCP数据报是如何被发送和接收的,因为这些都可以由JRTPLIB自己来完成。 只要 Poll()或者SendPacket()方法被成功调用,JRTPLIB就能够自动对到达的 RTCP数据报进行处理,并且还会在需要的时候发送RTCP数据报,从而能够确保整个RTP会话过程的正确性。
  • 而另一方面,通过调用RTPSession类提供的SetLocalName()、SetLocalEMail()、 SetLocalLocation()、SetLocalPhone()、SetLocalTool()和SetLocalNote()方法, JRTPLIB又允许程序员对RTP会话的控制信息进行设置。所有这些方法在调用时都带有两个参数,其中第一个参数是一个char型的指针,指向将要被设置的数据;而第二个参数则是一个int型的数值,表明该数据中的前面多少个字符将会被使用。例如下面的语句可以被用来设置控制信息中的电子邮件地址:
rtpSession.SetLocalEMail("xiaowp@linuxgam.comxiaowp@linuxgam.com",19);
  • 在RTP 会话过程中,不是所有的控制信息都需要被发送,通过调用RTPSession类提供的 EnableSendName()、EnableSendEMail()、EnableSendLocation()、EnableSendPhone ()、EnableSendTool()和EnableSendNote()方法,可以为当前RTP会话选择将被发送的控制信息。

销毁

发送退出记得释放内存即可,但是接收退出有两点要注意:

  • 第一点是若是开始接收数据BeginDataAccess一定要调用EndDataAccess否则不会关掉jthread线程,不会马上退出,退出不了也就无法重新Create

  • 第二点是接收了数据包则一定要调用DeletePacket数据包,然后调用销毁和等待退出,只要调用了EndDataAccess,AboutWait基本上是立即返回的,秒开秒关。

rtpSession.Destroy();
rtpSession.AbortWait();

整体代码

  • 发送端
#include <stdio.h>
#include "rtpsessionparams.h"
#include "rtpudpv4transmitter.h"
#include "rtpsession.h"

using namespace jrtplib;



int main(void)
{
    char destIp [16] = "127.0.0.1";
    int destPort = 10000;

    RTPSession rtpSession;
    RTPSessionParams           SessParams;
    RTPUDPv4TransmissionParams TransParams;
    SessParams.SetOwnTimestampUnit(1.0/8000.0);    // 时间戳:1秒钟8000个样本
    int iErrNum = rtpSession.Create(SessParams, &TransParams);
    if (iErrNum < 0){
        printf( "Create RTP Session error! Reason: %s!\r\n",  RTPGetErrorString(iErrNum).c_str() );
        exit(-1);
    }
    printf( "Create RTP Session OK! Reason: %s!\r\n", RTPGetErrorString(iErrNum).c_str() );


    // 指定RTP数据接收端
    RTPIPv4Address addr(ntohl(inet_addr(destIp)), destPort);
    iErrNum = rtpSession.AddDestination(addr);
    if (iErrNum < 0)
    {
        printf( "rtpSession AddDestination error! Reason: %s!\r\n",  RTPGetErrorString(iErrNum).c_str() );
        exit(-1);
    }
    printf( "rtpSession AddDestination ok! Reason: %s!\r\n",  RTPGetErrorString(iErrNum).c_str() );

    // 设置RTP会话默认参数
    rtpSession.SetDefaultPayloadType(0);
    rtpSession.SetDefaultMark(false);
    rtpSession.SetDefaultTimestampIncrement(0);

    // 发送流媒体数据
    char buffer[128];
    int index = 1;
    do {
        sprintf(buffer, "%d: RTP packet", index ++);
        rtpSession.SendPacket(buffer, strlen(buffer));
        printf("Send packet [%s]!\n", buffer);
    } while(1);


    rtpSession.Destroy();
    rtpSession.AbortWait();

    return 0;
}
  • 接收端
#include <stdio.h>
#include <iostream>
#include "rtpsessionparams.h"
#include "rtpudpv4transmitter.h"
#include "rtpsession.h"
#include "rtppacket.h"
using namespace jrtplib;



int main(void)
{
    int localPort = 10000;

    RTPSession rtpSession;
    RTPSessionParams           SessParams;
    RTPUDPv4TransmissionParams TransParams;
    SessParams.SetOwnTimestampUnit(1.0/8000.0);    // 时间戳:1秒钟8000个样本
    TransParams.SetPortbase(localPort);   // 设置本地接收的端口号
    int iErrNum = rtpSession.Create(SessParams, &TransParams);
    if (iErrNum < 0){
        printf( "Create RTP Session error! Reason: %s!\r\n",  RTPGetErrorString(iErrNum).c_str() );
        exit(-1);
    }
    printf( "Create RTP Session OK! Reason: %s!\r\n", RTPGetErrorString(iErrNum).c_str() );


    // 开始接收数据
    rtpSession.BeginDataAccess();
    if (rtpSession.GotoFirstSource())
    {
        do
        {
            RTPPacket *packet;
            while ((packet = rtpSession.GetNextPacket()) != 0)
            {

                // 获取接收数据长度
                unsigned int recvSize = packet->GetPayloadLength();
                // 获取接收数据
                unsigned char * recvData = (unsigned char *)packet->GetPayloadData();
                std::cout << "Got packet with extended sequence number "
                          << packet->GetExtendedSequenceNumber()
                          << " from SSRC " << packet->GetSSRC()  << "; recvSize " << recvSize << "[" << recvData << "]"
                          << std::endl;
                // 删除数据包
                rtpSession.DeletePacket(packet);
            }
        } while (rtpSession.GotoNextSource());
    }
    rtpSession.EndDataAccess();

    rtpSession.Destroy();
    rtpSession.AbortWait();

    return 0;
}

在这里插入图片描述
在这里插入图片描述

实例二

下面是官方例程的实例一:向指定目的地发送数据包。注意我的库是支持jthread的

#include "rtpsession.h"
#include "rtpudpv4transmitter.h"
#include "rtpipv4address.h"
#include "rtpsessionparams.h"
#include "rtperrors.h"
#include "rtplibraryversion.h"
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <string>

using namespace jrtplib;

void checkerror(int rtperr)
{
    if (rtperr < 0)
    {
        std::cout << "ERROR: " << RTPGetErrorString(rtperr) << std::endl;
        exit(-1);
    }
}


int main(void)
{
    RTPSession sess;
    uint16_t portbase,destport;
    uint32_t destip;
    std::string ipstr;
    int status,i,num;

    std::cout << "Using version " << RTPLibraryVersion::GetVersion().GetVersionString() << std::endl;

    std::cout << "Enter local portbase:" << std::endl;
    std::cin >> portbase;
    std::cout << std::endl;

    std::cout << "Enter the destination IP address" << std::endl;
    std::cin >> ipstr;
    destip = inet_addr(ipstr.c_str());
    if (destip == INADDR_NONE){
        std::cerr << "Bad IP address specified" << std::endl;
        return -1;
    }

    //inet_addr函数以网络字节顺序返回一个值,但我们需要按主机字节顺序的IP地址,所以我们使用对ntohl的调用
    destip = ntohl(destip);

    std::cout << "Enter the destination port" << std::endl;
    std::cin >> destport;

    std::cout << std::endl;
    std::cout << "Number of packets you wish to be sent:" << std::endl;
    std::cin >> num;

    RTPUDPv4TransmissionParams transparams;
    RTPSessionParams sessparams;

    重要提示:必须设置本地时间戳单位,否则RTCP发送方报告信息将计算错误。
    /// 在这种情况下,我们将每秒发送10个样本,因此我们将时间戳单位设置为(1.0/10.0)
    sessparams.SetOwnTimestampUnit(1.0/10.0);

    sessparams.SetAcceptOwnPackets(true);
    transparams.SetPortbase(portbase);
    status = sess.Create(sessparams,&transparams);
    checkerror(status);

    RTPIPv4Address addr(destip,destport);

    status = sess.AddDestination(addr);
    checkerror(status);

    for (i = 1 ; i <= num ; i++)
    {
        printf("\nSending packet %d/%d\n",i,num);

        // send the packet
        status = sess.SendPacket((void *)"1234567890",10,0,false,10);
        checkerror(status);

        sess.BeginDataAccess();

        // check incoming packets
        if (sess.GotoFirstSourceWithData())
        {
            do
            {
                RTPPacket *pack;

                while ((pack = sess.GetNextPacket()) != NULL)
                {
                    // You can examine the data here
                    printf("Got packet !\n");

                    // we don't longer need the packet, so
                    // we'll delete it
                    sess.DeletePacket(pack);
                }
            } while (sess.GotoNextSourceWithData());
        }

        sess.EndDataAccess();

#ifndef RTP_SUPPORT_THREAD
        status = sess.Poll();
		checkerror(status);
#endif // RTP_SUPPORT_THREAD

        RTPTime::Wait(RTPTime(1,0));
    }

    sess.BYEDestroy(RTPTime(10,0),0,0);

    return 0;
}

在这里插入图片描述

怎么开发一个基于RTP协议的流媒体播放器

如何实现发送

我们需要创建一个RTPSession的发送对象,然后初始化相关的参数:

	RTPSession session;
 
	RTPSessionParams sessionparams;
	sessionparams.SetOwnTimestampUnit(1.0 / 90000.0);
	sessionparams.SetAcceptOwnPackets(true);
 
	RTPUDPv4TransmissionParams transparams;
	transparams.SetPortbase(8000); //这个端口必须未被占用
 
	int status = session.Create(sessionparams, &transparams);
	if (status < 0)
	{
		//std::cerr << RTPGetErrorString(status) << std::endl;
		return - 1;
	}
 
#if 1
	RTPIPv4Address addr(ntohl(inet_addr(m_szDestIP)), m_nDestPort);
	status = session.AddDestination(addr);
#else
	unsigned long addr = ntohl(inet_addr(m_szDestIP));
	status = session.AddDestination(addr, m_nDestPort);
#endif
	if (status < 0)
	{
		//std::cerr << RTPGetErrorString(status) << std::endl;
		return -2;
	}
 
	session.SetDefaultPayloadType(96);
	session.SetDefaultMark(false);
	session.SetDefaultTimestampIncrement(90000.0 / 25.0);

这里初始化的参数包括RTP头的Payload类型(赋值为96),时间单位(1.0/90000.0),时间戳增量(90000/25=3600),以及Rtp头的MarkerBit的默认值。

接着读取一个视频文件,每次读1K字节,然后调用jrtplib的RTPSession::SendPacket函数发送数据:

	FILE *fp_open;
	uint8_t buff[1024 * 5] = { 0 };
	DWORD  bufsize = 1024; //每次读1024字节,不超过1400就行
 
	DWORD dwReadBytesPerSec = 2*1024*1024/8; //读取速度
	RTPTime delay(bufsize*1.0/ dwReadBytesPerSec);
 
	//读取文件
	fp_open = fopen(m_szFilePath, "rb");
	while (!feof(fp_open) && g_RTPSendThreadRun)
	{
		int true_size = fread(buff, 1, bufsize, fp_open);
 
		int status = session.SendPacket(buff, true_size);
 
		Sleep(1000* bufsize/dwReadBytesPerSec);
		//RTPTime::Wait(delay); //delay for a few milliseconds
	}

(注意:这里读文件数据只是简单地将文件数据块读出来然后直接发送,没有对视频帧做二次封装和处理,对于某些格式比如H264,一般要求要以NALU单元来传输,以FU-A分片方式打包,然后再封装到RTP包里面,而这里没有采取这种方式,大家要注意区分。)

如何实现接收

接收的实现较为复杂一些,用到了多线程技术和缓冲队列建议用到两条线程,一条用于接收RTP包,从中提取出视频数据;另一条线程用于解码视频,并把视频帧转成RGB格式后显示到窗口中。

  • 用到两条线程的好处是:可以并行接收和解码,两个工作相互独立,提高视频帧的处理效率,减少播放延时。
  • 而如果用一条线程来做,它既要接收又要解码,线程中处理一个帧的时间就长一些,而这时又不能接收数据,很可能造成后面的数据包丢掉。所以,用双线程的”分工合作“方式处理效率更高。
  • 两条线程之间需要维护一个队列,其中一条线程收到数据后放到队列里,然后另外一个线程从队列里读取数据,这是一个典型的”生产者-消费者“的模型,我们需要实现一个先入先出的队列来转运”视频帧“,这个队列的定义如下:
std::list<PacketNode_t>  m_packetList; //包列表

其中,PacketNode_t结构体的定义为:

typedef struct
{
	unsigned length;
	uint8_t *buf;
}PacketNode_t;

下面对接收线程和解码线程的工作流程作详细介绍。

首先,程序在接收前需要创建两个线程:

	g_RTPRecvThreadRun = true;
	g_decoding_thread_run = true;
 
	DWORD threadID = 0;
	m_hRecvThread   = CreateThread(NULL, 0, RTPRecvThread, this, 0, &threadID);
	m_hDecodeThread = CreateThread(NULL, 0, decoding_thread, this, 0, &threadID);

RTPRecvThread是RTP数据的接收线程,实现方式如下:

 
DWORD WINAPI RTPRecvThread(void* param)
{
	TRACE("RTPRecvThread began! \n");
 
	CPlayStreamDlg * pThisDlg = (CPlayStreamDlg*)param;
 
	RTPSession session;
	//WSADATA dat;
	//WSAStartup(MAKEWORD(2, 2), &dat);
 
	RTPSessionParams sessionparams;
	sessionparams.SetOwnTimestampUnit(1.0 / 90000.0);
	//sessionparams.SetAcceptOwnPackets(true);
 
	RTPUDPv4TransmissionParams transparams;
	transparams.SetPortbase(m_nRecvPort); //接收端口
 
	int oldBufSize = transparams.GetRTPReceiveBuffer();
	transparams.SetRTPReceiveBuffer(oldBufSize * 2);
	int status = session.Create(sessionparams, &transparams);
 
	int newBufSize = transparams.GetRTPReceiveBuffer();
	int oldBufSizec = transparams.GetRTCPReceiveBuffer();
	transparams.SetRTCPReceiveBuffer(oldBufSizec * 2);
	int newBufSizec = transparams.GetRTCPReceiveBuffer();
 
	while (g_RTPRecvThreadRun)
	{
		session.BeginDataAccess();
		if (session.GotoFirstSourceWithData())
		{
			do
			{
				RTPPacket *pack;
 
				while ((pack = session.GetNextPacket()) != NULL)
				{
					int nPayType = pack->GetPayloadType();
					int nLen = pack->GetPayloadLength();
					unsigned char *pPayData = pack->GetPayloadData();
					int nPackLen = pack->GetPacketLength();
					unsigned char *pPackData = pack->GetPacketData();
					int csrc_cont = pack->GetCSRCCount();
					int ssrc = pack->GetSSRC();
					int nTimestamp = pack->GetTimestamp();
					int nSeqNum = pack->GetSequenceNumber();
 
#if 0
					Writebuf((char*)pPayData, nLen);
#else			
					pThisDlg->m_cs.Lock();
					//if (pThisDlg->m_packetList.size() < MAX_PACKET_COUNT)
					{
						PacketNode_t  temNode;
						temNode.length = nLen;
						temNode.buf = new uint8_t[nLen];
						memcpy(temNode.buf, pPayData, nLen);
 
						pThisDlg->m_packetList.push_back(temNode); //存包列表
					}
					pThisDlg->m_cs.Unlock();
#endif
 
					session.DeletePacket(pack);
				}
			} while (session.GotoNextSourceWithData());
		}
		else
		{
			//Sleep(10);
		}
		session.EndDataAccess();
 
		Sleep(1);
	}
	session.Destroy();
 
	TRACE("RTPRecvThread end! \n");
	return 0;
}
  • 接收线程里创建了一个RTPSession对象,这个对象是用于接收RTP包,前面一部分代码用于初始化一些参数,包括:接收端口,时间戳单位,接收缓冲区大小。然后,进入一个循环,在里面不停地读取RTP数据包,如果session.GetNextPacket()返回的指针不为空,则表示读取到一个数据包,返回的指针变量是一个RTPPacket*类型,其指向的成员变量包括RTP头的各个字段的值,以及Payload数据的内存地址和大小。我们关键要提取出Payload的数据和大小,然后把它作为一个元素插入到缓冲队列中(如下面代码所示:)
pThisDlg->m_cs.Lock();
 
PacketNode_t  temNode;
temNode.length = nLen;
temNode.buf = new uint8_t[nLen];
memcpy(temNode.buf, pPayData, nLen);
 
pThisDlg->m_packetList.push_back(temNode); //存包列表
 
pThisDlg->m_cs.Unlock();

上面的接收线程实现了一个“生成者”,而“消费者”是实现在另外一个线程—decoding_thread,这个线程做的工作是解码。

  • 。这个线程调用了很多FFmpeg的函数,但基本的流程是:打开一个文件源或URL地址-》从源中读取各个流的信息-》初始化解码器-》解码和显示。
  • 因为我们是从网络中收数据,所以是一个网络源,从网络源中读取数据有两种方式:
    • 一种是用FFmpeg内置的协议栈的支持,比如RTSP/RTMP/RTP
    • 还有一种方式是我们传数据给FFmpeg,FFmpeg从内存中读取我们送的数据,然后用它的Demuxer和Parser来进行分析,分离出视频和音频。
  • 这里程序使用的是第二种方式,即从网络中探测数据,然后送数据给FFmpeg去解析。探测网络数据需要调用FFmpeg的av_probe_input_buffer函数,这个函数要传入一个内存缓冲区地址和一个回调函数指针,其中回调函数是用来从网络中读数据的(即我们放到缓冲队列里的数据包)。下面的fill_iobuffer就是读数据的回调函数,而pIOBuffer指向用于存放读取数据的缓冲区地址,FFmpeg就是从这里读取数据。
	pIObuffer = (uint8_t*)av_malloc(4096);
	pb = avio_alloc_context(
		pIObuffer,
		4096,
		0,
		param,
		fill_iobuffer,
		NULL,
		NULL);
 
	if (av_probe_input_buffer(pb, &piFmt, "", NULL, 0, 0) < 0)//探测从内存中获取到的媒体流的格式
	{
		TRACE("Error: probe format failed\n");
		return -1;
	}
	else {
		TRACE("input format:%s[%s]\n", piFmt->name, piFmt->long_name);
 
	}
  • 回调函数fill_iobuffer调用了一个ReadBuf的函数:
int fill_iobuffer(void* opaque, uint8_t* buf, int bufSize)
{
	ASSERT(opaque != NULL);
	CPlayStreamDlg* p_CPSDecoderDlg = (CPlayStreamDlg*)opaque;
 
	//TRACE("ReadBuf----- \n");
	int nBytes = ReadBuf((char*)buf, bufSize, (void*)p_CPSDecoderDlg);
	return (nBytes > 0) ? bufSize : -1;
}
static int ReadBuf(char* data, int len, void* pContext)
{
	CPlayStreamDlg * pThisDlg = (CPlayStreamDlg*)pContext;
 
	int data_to_read = len;
	char * pReadPtr = data;
 
	while (g_RTPRecvThreadRun)
	{
		int nRead = pThisDlg->ReadNetPacket((uint8_t*)pReadPtr, data_to_read);
		if (nRead < 0)
		{
			Sleep(10);
			continue;
		}
		pReadPtr += nRead;
		data_to_read -= nRead;
		if (data_to_read > 0)
		{
			Sleep(10);
			continue;
		}
		break;
	}
 
	return (data_to_read > 0) ? -1 : len;
}
  • ReadBuf函数的作用就不用解释了,大家一看就明白了。它实现了一个我们前面说的“消费者”,从前面实现的缓冲队列中读取数据包,读取之后就会从队列中删除相应的元素。如果队列不为空,则直接从前面的元素读取;如果无数据,则继续等待。
  • 读了视频帧数据之后,就到了解码,解码的代码如下:
	while (g_decoding_thread_run)
	{
		av_read_frame(pFormatContext, pAVPacket);
		if(pAVPacket->stream_index == video_stream_index)
		{
			avcodec_decode_video2(pCodecCtx, pFrame, &got_picture, pAVPacket);
			if(got_picture)
			{
				p_uint8_t_temp = pFrame->data[1];
				pFrame->data[1] = pFrame->data[2];
				pFrame->data[2] = p_uint8_t_temp;
				pFrame->data[0] += pFrame->linesize[0] * (pCodecCtx->height - 1);
				pFrame->linesize[0] *= -1;
				pFrame->data[1] += pFrame->linesize[1] * (pCodecCtx->height / 2 - 1);
				pFrame->linesize[1] *= -1;
				pFrame->data[2] += pFrame->linesize[2] * (pCodecCtx->height / 2 - 1);
				pFrame->linesize[2] *= -1;
				got_picture = sws_scale(img_convert_ctx, pFrame->data, pFrame->linesize, 0, pCodecCtx->height, RGB24Data, RGB24Linesize);
				got_picture = StretchDIBits(hDC, 0, 0, PlayingWidth, PlayingHeight, 0, 0, pCodecCtx->width, pCodecCtx->height, RGB24Data[0], (BITMAPINFO*)&bmpinfo, DIB_RGB_COLORS, SRCCOPY);
			}
		}
 
		av_free_packet(pAVPacket);
	}
  • FFmpeg从解码器输出的格式是YUV的,我们要转成RGB图像格式显示,所以调用了sws_scale函数来转换,最后调用Windows GDI函数—StretchDiBits来把图像显示到指定的窗口区域。
  • 如果要停止解码,则退出线程的时候记得要释放FFmpeg创建的资源:
	if (pFormatContext)
	{
		avformat_close_input(&pFormatContext);
		pFormatContext = NULL;
	}
 
	sws_freeContext(img_convert_ctx);
	av_freep(&RGB24Data[0]);
	av_frame_free(&pFrame);
	//avcodec_close(pCodecCtx);
	//av_free(pIObuffer); //调用了avformat_close_input会自动释放pIObuffer
	ReleaseDC(hwnd, hDC);

到此为止,一个简单的流媒体播放器的实现过程就介绍完了。

参考

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐