现在的位置: 首页 > 自动控制 > 工业·编程 > 正文

Live555学习之(三):建立RTSP连接的过程(RTSP服务器端)

2019-08-24 14:01 工业·编程 ⁄ 共 54142字 ⁄ 字号 暂无评论

上一篇我们简单分析了testOnDemandRTSPServer.cpp的main函数,主要步骤是创建RTSPServer,创建ServerMediaSession对象,然后等待RTSP客户端的连接。接下来我们分析一下Live555中建立RTSP连接的详细过程,首先我们需要简单了解一下RTSP协议建立连接的过程

1.(可选)

       RTSP客户端  —>   RTSP服务器端     OPTIONS命令             询问服务器端有哪些方法可使用

       RTSP服务器端 —>  RTSP客户端       回复OPTIONS命令        回复客户端服务器支持的方法

2. (可选)

RTSP客户端  —>  RTSP服务器端      DESCRIBE命令 请求对某个媒体资源(Live555中用ServerMediaSession表示)的描述信息

       RTSP服务器端 —>  RTSP客户端 回复DESCRIBE命令 回复客户端某个媒体资源的描述信息(即SDP)

      3. (必选)

       RTSP客户端  —>   RTSP服务器端 SETUP命令 请求建立对某个媒体资源的连接

RTSP服务器端 —>  RTSP客户端 回复SETUP命令 回复建立连接的结果

      4. (必选)

       RTSP客户端  —>   RTSP服务器端 PLAY命令 请求播放媒体资源

RTSP服务器端  —>  RTSP客户端 回复PLAY命令 回复播放的结果

      --------------------RTSP服务器端发送RTP包(封装了数据)给RTSP客户端-------------------------------

下面我们从RTSPServer::incomingConnectionHandlerRTSP函数开始,在incomingConnectionHandlerRTSP函数中又调用了RTSPServer::incomingConnectionHandler函数,在这个函数中accept客户端的TCP连接,然后调用RTSPServer::createNewClientConnection函数创建一个RTSPClientConnection实例,该实例表示一个与客户端的RTSP连接。

1 RTSPServer::RTSPClientConnection

2 ::RTSPClientConnection(RTSPServer& ourServer, int clientSocket, struct sockaddr_in clientAddr)

3   : fOurServer(ourServer), fIsActive(True),

4     fClientInputSocket(clientSocket), fClientOutputSocket(clientSocket), fClientAddr(clientAddr),

5     fRecursionCount(0), fOurSessionCookie(NULL) {

6   // Add ourself to our 'client connections' table:  把这个RTSPClientConnection实例添加到RTSPServer的列表中

7   fOurServer.fClientConnections->Add((char const*)this, this);

8  

9   // Arrange to handle incoming requests:

10   resetRequestBuffer();

11   envir().taskScheduler().setBackgroundHandling(fClientInputSocket, SOCKET_READABLE|SOCKET_EXCEPTION,

12                         (TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler, this);

13 }

RTSPClientConnection的构造函数中,将自己添加到RTSPServer的连接列表中,然后将客户端socket添加到SOCKET SET中,并且设置相应的回调处理函数incomingRequestHandler,然后就开始等待客户端发送命令。服务器端收到客户端的命令即回调RTSPClientConnection::incomingRequestHandler来处理。

在RTSPClientConnection::incomingRequestHandler函数中又调用RTSPClientConnection::incomingRequestHandler1函数,在这个函数中,从客户端socket中读取数据,读取的数据存储在RTSPClientConnection::fRequestBuffer这个数组中,然后调RTSPClientConnection::handleRequestBytes函数处理刚才读到的数据。handleRequestBytes函数的内容(比较多)主要是分析读取的数据,提取出命令名等数据,然后根据不同的命令调用不同的函数去处理,将处理后的结果保存在fResponseBuffer这个数组中,然后发送给客户端。在此,我们假设客户端跳过OPTINS命令,直接发送DESCRIBE命令请求建立连接,则在handleRequestBytes函数中会调用RTSPClientConnection::handleCmd_DESCRIBE函数来处理,下面来看一下handleCmd_DESCRIBE函数。先说一下urlPreSuffix和urlSuffix吧,假设客户端请求媒体资源的RTSP地址是rtsp://127.0.0.1:8554/test1/test2/test.264,urlPreSuffix表示的是ip:port之后(不含紧跟的“/”)到最后一个“/”之前的部分,即test1/test2,urlSuffix表示的是最后一个“/”之后(不含紧跟的“/”)的内容,即test.264。

1 void RTSPServer::RTSPClientConnection

2 ::handleCmd_DESCRIBE(char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr) {

3   char* sdpDescription = NULL;

4   char* rtspURL = NULL;

5   do {

6     char urlTotalSuffix[RTSP_PARAM_STRING_MAX];

7     if (strlen(urlPreSuffix) + strlen(urlSuffix) + 2 > sizeof urlTotalSuffix) {

8       handleCmd_bad();

9       break;

10     }

11     urlTotalSuffix[0] = '\0';                              // 拼接urlPreSuffix和urlSuffix,保存在urlTotalSuffix中

12     if (urlPreSuffix[0] != '\0') {

13       strcat(urlTotalSuffix, urlPreSuffix);

14       strcat(urlTotalSuffix, "/");

15     }

16     strcat(urlTotalSuffix, urlSuffix);

17    

18     if (!authenticationOK("DESCRIBE", urlTotalSuffix, fullRequestStr)) break;

19    

20     // We should really check that the request contains an "Accept:" #####

21     // for "application/sdp", because that's what we're sending back #####

22    

23     // Begin by looking up the "ServerMediaSession" object for the specified "urlTotalSuffix":

24     ServerMediaSession* session = fOurServer.lookupServerMediaSession(urlTotalSuffix);                 // 在RTSPServer中查找对应的ServerMediaSession

25     if (session == NULL) {

26       handleCmd_notFound();

27       break;

28     }

29    

30     // Then, assemble a SDP description for this session:

31     sdpDescription = session->generateSDPDescription();                                                // 产生SDP描述信息字符串

32     if (sdpDescription == NULL) {

33       // This usually means that a file name that was specified for a

34       // "ServerMediaSubsession" does not exist.

35       setRTSPResponse("404 File Not Found, Or In Incorrect Format");

36       break;

37     }

38     unsigned sdpDescriptionSize = strlen(sdpDescription);

39    

40     // Also, generate our RTSP URL, for the "Content-Base:" header

41     // (which is necessary to ensure that the correct URL gets used in subsequent "SETUP" requests).

42     rtspURL = fOurServer.rtspURL(session, fClientInputSocket);

43    

44     snprintf((char*)fResponseBuffer, sizeof fResponseBuffer,                              // 构造回复信息

45          "RTSP/1.0 200 OK\r\nCSeq: %s\r\n"

46          "%s"

47          "Content-Base: %s/\r\n"

48          "Content-Type: application/sdp\r\n"

49          "Content-Length: %d\r\n\r\n"

50          "%s",

51          fCurrentCSeq,

52          dateHeader(),

53          rtspURL,

54          sdpDescriptionSize,

55          sdpDescription);

56   } while (0);

57  

58  59

60   delete[] sdpDescription;

61   delete[] rtspURL;

62 }

  在handleCmd_DESCRIBE函数中,主要调用了ServerMediaSession::generateSDPDescription函数产生SDP信息,ServerMediaSession的SDP信息由每个ServerMediaSubsession的SDP信息构成,然后将产生的SDP回复给客户端。我们就来看一下generateSDPDescription函数。

  1 char* ServerMediaSession::generateSDPDescription() {

  2   AddressString ipAddressStr(ourIPAddress(envir()));

  3   unsigned ipAddressStrSize = strlen(ipAddressStr.val());

  4

  5   // For a SSM sessions, we need a "a=source-filter: incl ..." line also:

  6   char* sourceFilterLine;

  7   if (fIsSSM) {

  8     char const* const sourceFilterFmt =

  9       "a=source-filter: incl IN IP4 * %s\r\n"

10       "a=rtcp-unicast: reflection\r\n";

11     unsigned const sourceFilterFmtSize = strlen(sourceFilterFmt) + ipAddressStrSize + 1;

12

13     sourceFilterLine = new char[sourceFilterFmtSize];

14     sprintf(sourceFilterLine, sourceFilterFmt, ipAddressStr.val());

15   } else {

16     sourceFilterLine = strDup("");

17   }

18

19   char* rangeLine = NULL; // for now

20   char* sdp = NULL; // for now

21

22   do {

23     // Count the lengths of each subsession's media-level SDP lines.

24     // (We do this first, because the call to "subsession->sdpLines()"

25     // causes correct subsession 'duration()'s to be calculated later.)

 

        //首先调用每个ServerMediaSubsession的sdpLines函数,用来计算sdp的长度

26     unsigned sdpLength = 0;

27     ServerMediaSubsession* subsession;

28     for (subsession = fSubsessionsHead; subsession != NULL;

29      subsession = subsession->fNext) {

30       char const* sdpLines = subsession->sdpLines();

31       if (sdpLines == NULL) continue; // the media's not available

32       sdpLength += strlen(sdpLines);

33     }

34     if (sdpLength == 0) break; // the session has no usable subsessions

35

36     // Unless subsessions have differing durations, we also have a "a=range:" line:

        // 计算ServerMediaSession的持续时间,该返回值影响a=range字段,该字段决定了该媒体资源是否可以执行快进、快退、任意进度点播,ServerMediaSession的duration由各个ServerMediaSubsession的duration决定。

        // ServerMediaSubsession的duration默认实现是返回0,Live555只对部分格式的媒体文件实现了duration函数,如MKV文件的MatroskaFileServerMediaSubsession分析了mkv文件的播放时长

37     float dur = duration(); 38     if (dur == 0.0) {

39       rangeLine = strDup("a=range:npt=0-\r\n");

40     } else if (dur > 0.0) {

41       char buf[100];

42       sprintf(buf, "a=range:npt=0-%.3f\r\n", dur);

43       rangeLine = strDup(buf);

44     } else { // subsessions have differing durations, so "a=range:" lines go there

45       rangeLine = strDup("");

46     }

47

48     char const* const sdpPrefixFmt =

49       "v=0\r\n"

50       "o=- %ld%06ld %d IN IP4 %s\r\n"

51       "s=%s\r\n"

52       "i=%s\r\n"

53       "t=0 0\r\n"

54       "a=tool:%s%s\r\n"

55       "a=type:broadcast\r\n"

56       "a=control:*\r\n"

57       "%s"

58       "%s"

59       "a=x-qt-text-nam:%s\r\n"

60       "a=x-qt-text-inf:%s\r\n"

61       "%s";

62     sdpLength += strlen(sdpPrefixFmt)

63       + 20 + 6 + 20 + ipAddressStrSize

64       + strlen(fDescriptionSDPString)

65       + strlen(fInfoSDPString)

66       + strlen(libNameStr) + strlen(libVersionStr)

67       + strlen(sourceFilterLine)

68       + strlen(rangeLine)

69       + strlen(fDescriptionSDPString)

70       + strlen(fInfoSDPString)

71       + strlen(fMiscSDPLines);

72     sdpLength += 1000; // in case the length of the "subsession->sdpLines()" calls below change

73     sdp = new char[sdpLength];

74     if (sdp == NULL) break;

75

76     // Generate the SDP prefix (session-level lines):

77     snprintf(sdp, sdpLength, sdpPrefixFmt,

78          fCreationTime.tv_sec, fCreationTime.tv_usec, // o= <session id>

79          1, // o= <version> // (needs to change if params are modified)

80          ipAddressStr.val(), // o= <address>

81          fDescriptionSDPString, // s= <description>

82          fInfoSDPString, // i= <info>

83          libNameStr, libVersionStr, // a=tool:

84          sourceFilterLine, // a=source-filter: incl (if a SSM session)

85          rangeLine, // a=range: line

86          fDescriptionSDPString, // a=x-qt-text-nam: line

87          fInfoSDPString, // a=x-qt-text-inf: line

88          fMiscSDPLines); // miscellaneous session SDP lines (if any)

89

90     // Then, add the (media-level) lines for each subsession: 

        // 再次调用每个ServerMediaSubsession的sdpLines函数,这次真正将每个ServerMediaSubsession的sdp信息添加到ServerMediaSession的SDP信息中

91     char* mediaSDP = sdp;

92     for (subsession = fSubsessionsHead; subsession != NULL;

93      subsession = subsession->fNext) {

94       unsigned mediaSDPLength = strlen(mediaSDP);

95       mediaSDP += mediaSDPLength; // 指针后移

96       sdpLength -= mediaSDPLength;

97       if (sdpLength <= 1) break; // the SDP has somehow become too long

98

99       char const* sdpLines = subsession->sdpLines();

100       if (sdpLines != NULL) snprintf(mediaSDP, sdpLength, "%s", sdpLines);

101     }

102   } while (0);

103

104   delete[] rangeLine; delete[] sourceFilterLine;

105   return sdp;

106 }

到此,服务器端将客户端请求的SDP信息发送给客户端,然后等着客户端发送下一个命令(SETUP命令),在分析服务器端如何处理SETUP命令之前,我们继续深入看一下服务器端是如何获得SDP信息的。从generateSDPDescription函数中可以看到,主要是调用了每个ServerMediaSubsession的sdpLines函数,默认实现在OnDemandServerMediaSubsession这个类中,下面我们就来看看OnDemandServerMediaSubsession::sdpLines函数。

1 char const* OnDemandServerMediaSubsession::sdpLines() {

2   if (fSDPLines == NULL) {

3     // We need to construct a set of SDP lines that describe this

4     // subsession (as a unicast stream).  To do so, we first create

5     // dummy (unused) source and "RTPSink" objects,

6     // whose parameters we use for the SDP lines:

       // 这几句话的意思是说,为了获得这个ServerMediaSubsession的sdp信息,我们先创建“虚设的”FramedSource和RTPSink来分析出sdp信息,并非正式的开始播放

7     unsigned estBitrate;

       // 创建FramedSource对象,用来获取数据

       //(这里实际调用的是子类H264VideoFileServerMediaSubsession的createNewStreamSource函数,创建的是ByteStreamFileSource,ByteStreamFileSource是FramedSource的子类)

8     FramedSource* inputSource = createNewStreamSource(0, estBitrate);              // 创建FramedSource对象,获取视频帧数据

9     if (inputSource == NULL) return NULL; // file not found

10

11     struct in_addr dummyAddr;

12     dummyAddr.s_addr = 0;

13     Groupsock dummyGroupsock(envir(), dummyAddr, 0, 0);

14     unsigned char rtpPayloadType = 96 + trackNumber()-1; // if dynamic

       // 创建RTPSink对象,用来保存RTP数据包(这里实际调用的是子类H264VideoFileServerMediaSubsession的createNewRTPSink函数,创建的是H264VideoRTPSink对象,H264VideoRTPSink是RTPSink的子类)

15     RTPSink* dummyRTPSink

16       = createNewRTPSink(&dummyGroupsock, rtpPayloadType, inputSource);  

17     if (dummyRTPSink != NULL && dummyRTPSink->estimatedBitrate() > 0) estBitrate = dummyRTPSink->estimatedBitrate();

18

19     setSDPLinesFromRTPSink(dummyRTPSink, inputSource, estBitrate);           // 通过RTPSink对象获得ServerMediaSubsession的sdp信息

20     Medium::close(dummyRTPSink);

21     closeStreamSource(inputSource);

22   }

23

24   return fSDPLines;

25 }

  我们再转到OnDemandServerMediaSubsession::setSDPLinesFromRTPSink函数,在这个函数中,我们通过创建的FramedSource对象和RTPSink对象将文件播放一段以便产生出sdp信息。在此,我要插一下Live555  RTSPServer播放媒体资源的一个大体流程:RTSPServer使用RTPSink获得和保存RTP包,RTPSink不断地向FramedSource请求帧数据,FramedSource取得帧数据后就调用回调函数把数据给RTPSink处理,RTPSink在回调函数中将数据发送给客户端(也可以保存在本地存成文件,即录像的功能)

1 void OnDemandServerMediaSubsession

2 ::setSDPLinesFromRTPSink(RTPSink* rtpSink, FramedSource* inputSource, unsigned estBitrate) {

3   if (rtpSink == NULL) return;

4

     //通过RTPSink获取各种关于该ServerMediaSubsession的信息,最主要的是获取auxSDPLine

5   char const* mediaType = rtpSink->sdpMediaType();

6   unsigned char rtpPayloadType = rtpSink->rtpPayloadType();

7   AddressString ipAddressStr(fServerAddressForSDP);

8   char* rtpmapLine = rtpSink->rtpmapLine();

9   char const* rtcpmuxLine = fMultiplexRTCPWithRTP ? "a=rtcp-mux\r\n" : "";

10   char const* rangeLine = rangeSDPLine();

11   char const* auxSDPLine = getAuxSDPLine(rtpSink, inputSource);

12   if (auxSDPLine == NULL) auxSDPLine = "";

13

14   char const* const sdpFmt =

15     "m=%s %u RTP/AVP %d\r\n"

16     "c=IN IP4 %s\r\n"

17     "b=AS:%u\r\n"

18     "%s"

19     "%s"

20     "%s"

21     "%s"

22     "a=control:%s\r\n";

23   unsigned sdpFmtSize = strlen(sdpFmt)

24     + strlen(mediaType) + 5 /* max short len */ + 3 /* max char len */

25     + strlen(ipAddressStr.val())

26     + 20 /* max int len */

27     + strlen(rtpmapLine)

28     + strlen(rtcpmuxLine)

29     + strlen(rangeLine)

30     + strlen(auxSDPLine)

31     + strlen(trackId());

32   char* sdpLines = new char[sdpFmtSize];

33   sprintf(sdpLines, sdpFmt,

34       mediaType, // m= <media>

35       fPortNumForSDP, // m= <port>

36       rtpPayloadType, // m= <fmt list>

37       ipAddressStr.val(), // c= address

38       estBitrate, // b=AS:<bandwidth>

39       rtpmapLine, // a=rtpmap:... (if present)

40       rtcpmuxLine, // a=rtcp-mux:... (if present)

41       rangeLine, // a=range:... (if present)

42       auxSDPLine, // optional extra SDP line

43       trackId()); // a=control:<track-id>

44   delete[] (char*)rangeLine; delete[] rtpmapLine;

45

46   fSDPLines = strDup(sdpLines);

47   delete[] sdpLines;

48 }

  在setSDPLinesFromRTPSink函数中通过RTPSink对象获得各种信息,最复杂的是获取auxSDPLine的过程,这个函数在H264VideoFileServerMediaSubsession类中被重写了,由于我们现在分析的媒体资源是.264文件,所以我们来看一下H264VideoFileServerMediaSubsession::getAuxSDPLine函数:

1 char const* H264VideoFileServerMediaSubsession::getAuxSDPLine(RTPSink* rtpSink, FramedSource* inputSource) {

2   if (fAuxSDPLine != NULL) return fAuxSDPLine; // it's already been set up (for a previous client)

3

4   if (fDummyRTPSink == NULL) { // we're not already setting it up for another, concurrent stream

5     // Note: For H264 video files, the 'config' information ("profile-level-id" and "sprop-parameter-sets") isn't known

6     // until we start reading the file.  This means that "rtpSink"s "auxSDPLine()" will be NULL initially,

7     // and we need to start reading data from our file until this changes.

8     fDummyRTPSink = rtpSink;

9

10     // Start reading the file:   //调用RTPSink的startPlaying函数来播放,对于文件型的ServerMediaSubsession,Live555的做法是播放一段文件来获取sdp信息

11     fDummyRTPSink->startPlaying(*inputSource, afterPlayingDummy, this);

12

13     // Check whether the sink's 'auxSDPLine()' is ready:

14     checkForAuxSDPLine(this);

15   }

16

17   envir().taskScheduler().doEventLoop(&fDoneFlag);  // fDoneFlag初始值为NULL,让程序在此循环等待,直到成功分析出sdp信息

18

19   return fAuxSDPLine;

20 }

  在这个函数中调用RTPSink的startPlaying函数开始读取数据,调用H264VideoFileServerMediaSubsession::checkForAuxSDPLine函数来检查是否已经从读取的数据中分析出sdp信息,看一下checkForAuxSDPLine函数:

1 static void checkForAuxSDPLine(void* clientData) {

2   H264VideoFileServerMediaSubsession* subsess = (H264VideoFileServerMediaSubsession*)clientData;

3   subsess->checkForAuxSDPLine1();

4 }

5

6 void H264VideoFileServerMediaSubsession::checkForAuxSDPLine1() {

7   char const* dasl;

8

9   if (fAuxSDPLine != NULL) {                       //说明已经分析出了sdp信息

10     // Signal the event loop that we're done:

11     setDoneFlag();                                 // 使程序退出循环等待        

12   }  // 还没分析出sdp信息,调用RTPSink的auxSDPLine函数分析sdp信息

     else if (fDummyRTPSink != NULL && (dasl = fDummyRTPSink->auxSDPLine()) != NULL) {

13     fAuxSDPLine = strDup(dasl);

14     fDummyRTPSink = NULL;

15

16     // Signal the event loop that we're done:

17     setDoneFlag(); // 分析出了sdp信息,使程序退出循环等待

18   } else if (!fDoneFlag)                       // 仍然没有分析出sdp信息,则稍后一会儿再执行checkForAuxSDPLine函数

19     // try again after a brief delay:

20     int uSecsToDelay = 100000; // 100 ms

21     nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecsToDelay,

22                   (TaskFunc*)checkForAuxSDPLine, this);

23   }

24 }

上面检查发现没有分析出sdp信息后,调用H264VideoRTPSink::auxSDPLine函数再次试图分析出sdp信息,看看auxSDPLine函数:

1 char const* H264VideoRTPSink::auxSDPLine() {

2   // Generate a new "a=fmtp:" line each time, using our SPS and PPS (if we have them),

3   // otherwise parameters from our framer source (in case they've changed since the last time that

4   // we were called):

5   H264or5VideoStreamFramer* framerSource = NULL;

6   u_int8_t* vpsDummy = NULL; unsigned vpsDummySize = 0;

7   u_int8_t* sps = fSPS; unsigned spsSize = fSPSSize;

8   u_int8_t* pps = fPPS; unsigned ppsSize = fPPSSize;

9   if (sps == NULL || pps == NULL) {

10     // We need to get SPS and PPS from our framer source:

       //  fOurFragmenter在调用startPlaying函数后被创建

11     if (fOurFragmenter == NULL) return NULL; // we don't yet have a fragmenter (and therefore not a source)

12     framerSource = (H264or5VideoStreamFramer*)(fOurFragmenter->inputSource());

13     if (framerSource == NULL) return NULL; // we don't yet have a source

14

15     framerSource->getVPSandSPSandPPS(vpsDummy, vpsDummySize, sps, spsSize, pps, ppsSize);

16     if (sps == NULL || pps == NULL) return NULL; // our source isn't ready

17   }

18

     // 已经从文件里面成功读出了数据,接下来就分析sdp信息

19   // Set up the "a=fmtp:" SDP line for this stream:

20   u_int8_t* spsWEB = new u_int8_t[spsSize]; // "WEB" means "Without Emulation Bytes"

21   unsigned spsWEBSize = removeH264or5EmulationBytes(spsWEB, spsSize, sps, spsSize);

22   if (spsWEBSize < 4) { // Bad SPS size => assume our source isn't ready

23     delete[] spsWEB;

24     return NULL;

25   }

26   u_int32_t profileLevelId = (spsWEB[1]<<16) | (spsWEB[2]<<8) | spsWEB[3];

27   delete[] spsWEB;

28

29   char* sps_base64 = base64Encode((char*)sps, spsSize);

30   char* pps_base64 = base64Encode((char*)pps, ppsSize);

31

32   char const* fmtpFmt =

33     "a=fmtp:%d packetization-mode=1"

34     ";profile-level-id=%06X"

35     ";sprop-parameter-sets=%s,%s\r\n";

36   unsigned fmtpFmtSize = strlen(fmtpFmt)

37     + 3 /* max char len */

38     + 6 /* 3 bytes in hex */

39     + strlen(sps_base64) + strlen(pps_base64);

40   char* fmtp = new char[fmtpFmtSize];

41   sprintf(fmtp, fmtpFmt,

42           rtpPayloadType(),

43       profileLevelId,

44           sps_base64, pps_base64);

45

46   delete[] sps_base64;

47   delete[] pps_base64;

48

49   delete[] fFmtpSDPLine; fFmtpSDPLine = fmtp;

50   return fFmtpSDPLine;

51 }

  至此,RTSPServer成功地处理了客户端发送来的DESCRIBE命令,将SDP信息回复客户端。然后,客户端对应每个ServerMediaSubsession发送一个SETUP命令请求建立与该ServerMediaSubsession的连接,服务器端收到后会调用RTSPClientSession::handleCmd_SETUP函数来处理SETUP命令。RTSPClientSession类是服务器端用来维护和客户端的一个会话,SETUP命令、PLAY命令、PAUSE命令、TEARDOWN命令等都是在RTSPClientSession中处理的,RTSPClientSession是RTSPClientConnection的内部类,来看一下这个类:

  1  // The state of an individual client session (using one or more sequential TCP connections) handled by a RTSP server:

  2    class RTSPClientSession {

  3    protected:

  4      RTSPClientSession(RTSPServer& ourServer, u_int32_t sessionId);

  5      virtual ~RTSPClientSession();

  6 

  7       friend class RTSPServer;

  8       friend class RTSPClientConnection;

  9       // Make the handler functions for each command virtual, to allow subclasses to redefine them:

10      virtual void handleCmd_SETUP(RTSPClientConnection* ourClientConnection, // 处理SETUP命令

11                   char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr);

12      virtual void handleCmd_withinSession(RTSPClientConnection* ourClientConnection,

13                       char const* cmdName,

14                       char const* urlPreSuffix, char const* urlSuffix,

15                       char const* fullRequestStr);

16      virtual void handleCmd_TEARDOWN(RTSPClientConnection* ourClientConnection, // 处理TEARDOWN命令(结束会话)

17                      ServerMediaSubsession* subsession);

18      virtual void handleCmd_PLAY(RTSPClientConnection* ourClientConnection, // 处理PLAY命令

19                  ServerMediaSubsession* subsession, char const* fullRequestStr);

20      virtual void handleCmd_PAUSE(RTSPClientConnection* ourClientConnection, // 处理PAUSE命令

21                   ServerMediaSubsession* subsession);

22      virtual void handleCmd_GET_PARAMETER(RTSPClientConnection* ourClientConnection,

23                       ServerMediaSubsession* subsession, char const* fullRequestStr);

24      virtual void handleCmd_SET_PARAMETER(RTSPClientConnection* ourClientConnection,

25                       ServerMediaSubsession* subsession, char const* fullRequestStr);

26    protected:

27      UsageEnvironment& envir() { return fOurServer.envir(); }

28      void reclaimStreamStates();

29      Boolean isMulticast() const { return fIsMulticast; }

30      void noteLiveness();

31      static void noteClientLiveness(RTSPClientSession* clientSession); // 与客户端的心跳

32      static void livenessTimeoutTask(RTSPClientSession* clientSession);

33 

34      // Shortcuts for setting up a RTSP response (prior to sending it):

35      void setRTSPResponse(RTSPClientConnection* ourClientConnection, char const* responseStr) { ourClientConnection->setRTSPResponse(responseStr); }

36      void setRTSPResponse(RTSPClientConnection* ourClientConnection, char const* responseStr, u_int32_t sessionId) { ourClientConnection->setRTSPResponse(responseStr, sessionId); }

37      void setRTSPResponse(RTSPClientConnection* ourClientConnection, char const* responseStr, char const* contentStr) { ourClientConnection->setRTSPResponse(responseStr, contentStr); }

38      void setRTSPResponse(RTSPClientConnection* ourClientConnection, char const* responseStr, u_int32_t sessionId, char const* contentStr) { ourClientConnection->setRTSPResponse(responseStr, sessionId, contentStr); }

39 

40    protected:

41      RTSPServer& fOurServer;

42      u_int32_t fOurSessionId;

43      ServerMediaSession* fOurServerMediaSession;

44      Boolean fIsMulticast, fStreamAfterSETUP;

45      unsigned char fTCPStreamIdCount; // used for (optional) RTP/TCP

46      Boolean usesTCPTransport() const { return fTCPStreamIdCount > 0; }

47      TaskToken fLivenessCheckTask;

48      unsigned fNumStreamStates; // streamState对象的数目

49      struct streamState { // streamState结构体,保存一个请求的ServerMediaSubsession以及对应的StreamState对象

50        ServerMediaSubsession* subsession;

51        void* streamToken; // streamToken指向一个StreamState对象

52      } * fStreamStates;                     // fStreamStates是streamState数组

53    };

54 

      /* StreamState类从名字上就可以看出是服务器端用来保存对某个ServerMediaSubsession的流化的状态(包括serverRTPPort、serverRTCPPort、rtpSink、mediaSource等)

         当某个ServerMediaSubsession被客户端请求SETUP时,服务器端会创建一个StreamState对象,并创建相关的服务器端socket、RTPSink、FramedSource为后面的播放做好准备,

在创建一个ServerMediaSubsession对象时(详情见testOnDemandRTSPServer.cpp的main函数),会传入reuseFirstSource这个参数。如果reuseFirstSource为true,

则表示对于请求该ServerMediaSubsession的所有客户端都使用同一个StreamState对象,即服务器端使用同一个RTP端口、RTCP端口、RTPSink、FramedSource来为请求该

ServerMediaSubsession的多个客户端服务(一对多,节省服务器端资源);而如果reuseFirstSource为false,则服务器端为每个对ServerMediaSubsession的请求创建一个StreamState对象(多对多,需要占用服务器端较多资源) */

55 class StreamState {                                        // StreamState类,表示服务器端对一个ServerMediaSubsession的一次流化,并保存相关状态

56 public:

57   StreamState(OnDemandServerMediaSubsession& master,

58               Port const& serverRTPPort, Port const& serverRTCPPort,

59           RTPSink* rtpSink, BasicUDPSink* udpSink,

60           unsigned totalBW, FramedSource* mediaSource,

61           Groupsock* rtpGS, Groupsock* rtcpGS);

62   virtual ~StreamState();

63

64   void startPlaying(Destinations* destinations, // 开始播放,服务器端在收到PLAY命令后,就是调用各个StreamState的startPlaying函数来开始播放一个ServerMediaSubsession

65             TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,

66             ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,

67                     void* serverRequestAlternativeByteHandlerClientData);

68   void pause();

69   void endPlaying(Destinations* destinations); // 结束播放

70   void reclaim();

71

72   unsigned& referenceCount() { return fReferenceCount; }         // 引用该路流的客户端数目

73

74   Port const& serverRTPPort() const { return fServerRTPPort; }

75   Port const& serverRTCPPort() const { return fServerRTCPPort; }

76

77   RTPSink* rtpSink() const { return fRTPSink; }

78

79   float streamDuration() const { return fStreamDuration; }

80

81   FramedSource* mediaSource() const { return fMediaSource; }

82   float& startNPT() { return fStartNPT; }

83

84 private:

85   OnDemandServerMediaSubsession& fMaster;

86   Boolean fAreCurrentlyPlaying;

87   unsigned fReferenceCount;

88

89   Port fServerRTPPort, fServerRTCPPort;

90

91   RTPSink* fRTPSink;

92   BasicUDPSink* fUDPSink;

93

94   float fStreamDuration;

95   unsigned fTotalBW;

96   RTCPInstance* fRTCPInstance;

97

98   FramedSource* fMediaSource;

99   float fStartNPT; // initial 'normal play time'; reset after each seek

100

101   Groupsock* fRTPgs;

102   Groupsock* fRTCPgs;

103 };

接下来,在handleCmd_SETUP函数中,服务器首先找到客户端请求的ServerMediaSession,再找到客户端请求的ServerMediaSubsession,然后从客户端的请求中获取一些客户端参数(如:客户端的RTP端口、RTCP端口),最后调用OnDemandServerMediaSubsession::getStreamParameters函数创建RTP连接和RTCP连接。看一下getStreamParameters函数:

  1 void OnDemandServerMediaSubsession

  2 ::getStreamParameters(unsigned clientSessionId,netAddressBits clientAddress,Port const& clientRTPPort,

  3               Port const& clientRTCPPort,int tcpSocketNum,unsigned char rtpChannelId,

  4               unsigned char rtcpChannelId,netAddressBits& destinationAddress,u_int8_t& /*destinationTTL*/,

  5               Boolean& isMulticast,Port& serverRTPPort,Port& serverRTCPPort,void*& streamToken) {

  6   if (destinationAddress == 0) destinationAddress = clientAddress;

  7   struct in_addr destinationAddr; destinationAddr.s_addr = destinationAddress;

  8   isMulticast = False;

  9

10   if (fLastStreamToken != NULL && fReuseFirstSource) {                       // 如果fReuseFirstSource为true,则使用之前已经创建的StreamState对象

11     // Special case: Rather than creating a new 'StreamState',

12     // we reuse the one that we've already created:

13     serverRTPPort = ((StreamState*)fLastStreamToken)->serverRTPPort();

14     serverRTCPPort = ((StreamState*)fLastStreamToken)->serverRTCPPort();

15     ++((StreamState*)fLastStreamToken)->referenceCount();

16     streamToken = fLastStreamToken;

17   } else { // 对于该ServerMediaSubsession尚未创建StreamState对象,或者fReuseFirstSource为false

18     // Normal case: Create a new media source:

19     unsigned streamBitrate;

20     FramedSource* mediaSource // 创建FramedSource对象,实际调用的是子类的createNewStreamSource函数,对应H264VideoFileServerMediaSubsession创建的是ByteStreamFileSource

21       = createNewStreamSource(clientSessionId, streamBitrate);

22

23     // Create 'groupsock' and 'sink' objects for the destination,

24     // using previously unused server port numbers:

25     RTPSink* rtpSink = NULL;

26     BasicUDPSink* udpSink = NULL;

27     Groupsock* rtpGroupsock = NULL;

28     Groupsock* rtcpGroupsock = NULL;

29

30     if (clientRTPPort.num() != 0 || tcpSocketNum >= 0) { // Normal case: Create destinations

31       portNumBits serverPortNum;

32       if (clientRTCPPort.num() == 0) {

33     // We're streaming raw UDP (not RTP). Create a single groupsock:

34     NoReuse dummy(envir()); // ensures that we skip over ports that are already in use

35     for (serverPortNum = fInitialPortNum; ; ++serverPortNum) {

36       struct in_addr dummyAddr; dummyAddr.s_addr = 0;

37      

38       serverRTPPort = serverPortNum;

39       rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);

40       if (rtpGroupsock->socketNum() >= 0) break; // success

41     }

42

43     udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock);

44       } else { // 创建两个服务器端socket用来传输RTP包和RTCP包,对应rtpGroupsock和rtcpGroupsock

45     // Normal case: We're streaming RTP (over UDP or TCP).  Create a pair of

46     // groupsocks (RTP and RTCP), with adjacent port numbers (RTP port number even).

47     // (If we're multiplexing RTCP and RTP over the same port number, it can be odd or even.)

48     NoReuse dummy(envir()); // ensures that we skip over ports that are already in use

49     for (portNumBits serverPortNum = fInitialPortNum; ; ++serverPortNum) {

50       struct in_addr dummyAddr; dummyAddr.s_addr = 0;

51

52       serverRTPPort = serverPortNum;

53       rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);

54       if (rtpGroupsock->socketNum() < 0) {

55         delete rtpGroupsock;

56         continue; // try again

57       }

58

59       if (fMultiplexRTCPWithRTP) {

60         // Use the RTP 'groupsock' object for RTCP as well:

61         serverRTCPPort = serverRTPPort;

62         rtcpGroupsock = rtpGroupsock;

63       } else {

64         // Create a separate 'groupsock' object (with the next (odd) port number) for RTCP:

65         serverRTCPPort = ++serverPortNum;

66         rtcpGroupsock = new Groupsock(envir(), dummyAddr, serverRTCPPort, 255);

67         if (rtcpGroupsock->socketNum() < 0) {

68           delete rtpGroupsock;

69           delete rtcpGroupsock;

70           continue; // try again

71         }

72       }

73

74       break; // success

75     }

76

77     unsigned char rtpPayloadType = 96 + trackNumber()-1; // if dynamic

         //创建RTPSink,实际调用的是子类的createNewRTPSink函数,对应H264VideoFileServerMediaSubsession创建的是H264VideoRTPSink

78     rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);

79     if (rtpSink != NULL && rtpSink->estimatedBitrate() > 0) streamBitrate = rtpSink->estimatedBitrate();    80       }

81

82       // Turn off the destinations for each groupsock.  They'll get set later

83       // (unless TCP is used instead):

84

85       if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations();

86       if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations();

87

88       if (rtpGroupsock != NULL) {

89     // Try to use a big send buffer for RTP -  at least 0.1 second of

90     // specified bandwidth and at least 50 KB

91    

92     unsigned rtpBufSize = streamBitrate * 25 / 2; // 1 kbps * 0.1 s = 12.5 bytes

93     if (rtpBufSize < 50 * 1024) rtpBufSize = 50 * 1024;

94     increaseSendBufferTo(envir(), rtpGroupsock->socketNum(), rtpBufSize);

95       }

96     }

97

98     // Set up the state of the stream.  The stream will get started later:

99     streamToken = fLastStreamToken

100       = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,

101             streamBitrate, mediaSource,

102             rtpGroupsock, rtcpGroupsock);

103   }

104

105   // Record these destinations as being for this client session id:

106   Destinations* destinations;

107   if (tcpSocketNum < 0) { // UDP

108     destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort);

109   } else { // TCP

110     destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);

111   }

112   fDestinationsHashTable->Add((char const*)clientSessionId, destinations);

113 }

复制代码

经过上面的步骤后,服务器端就已经准备好向客户端传送RTP包以及RTCP包了,等待客户端发送PLAY命令后开始传输。服务器端收到PLAY命令后,调用RTSPClientSession::handleCmd_PLAY函数处理。在handleCmd_PLAY函数中,首先提取Scale,表示客户端期望的播放速度(正常、快进、快退),然后提取Range,表示客户端期望的播放起止范围,根据这两个参数分别调用ServerMediaSubsession::setStreamScale函数和ServerMediaSubsession::seekStream函数,最后调用ServerMediaSubsession::startStream函数开始传输数据。实际调用的是OnDemandServerMediaSubsession::startStream函数,看一下这个函数的内容:

复制代码

1 void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,void* streamToken,

2                         TaskFunc* rtcpRRHandler,void* rtcpRRHandlerClientData,

3                         unsigned short& rtpSeqNum,unsigned& rtpTimestamp,

4                         ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,

5                         void* serverRequestAlternativeByteHandlerClientData) {

6   StreamState* streamState = (StreamState*)streamToken;

7   Destinations* destinations

8     = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId)); // 查找目的客户端的地址

9   if (streamState != NULL) {

10     streamState->startPlaying(destinations,

11                   rtcpRRHandler, rtcpRRHandlerClientData,

12                   serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData); //调用StreamState::startPlaying函数开始播放

13    

14     RTPSink* rtpSink = streamState->rtpSink(); // alias

15     if (rtpSink != NULL) {

16       rtpSeqNum = rtpSink->currentSeqNo();

17       rtpTimestamp = rtpSink->presetNextTimestamp();

18     }

19   }

20 }

在OnDemandServerMediaSubsessionstartStream函数中,主要是调用了StreamState::startPlaying函数,来看一下这个函数:

1 void StreamState

2 ::startPlaying(Destinations* dests,

3            TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,

4            ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,

5            void* serverRequestAlternativeByteHandlerClientData) {

6   if (dests == NULL) return;

7

8   if (fRTCPInstance == NULL && fRTPSink != NULL) {

9     // Create (and start) a 'RTCP instance' for this RTP sink:

10     fRTCPInstance

11       = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,

12                 fTotalBW, (unsigned char*)fMaster.fCNAME,

13                 fRTPSink, NULL /* we're a server */);

14         // Note: This starts RTCP running automatically

15   }

16

17   if (dests->isTCP) { // 使用TCP传输RTP包和RTCP包

18     19     // Change RTP and RTCP to use the TCP socket instead of UDP:

20     if (fRTPSink != NULL) {

21       fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);

22       RTPInterface

23     ::setServerRequestAlternativeByteHandler(fRTPSink->envir(), dests->tcpSocketNum,

24                          serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);

25         // So that we continue to handle RTSP commands from the client

26     }

27     if (fRTCPInstance != NULL) {

28       fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);

29       fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,

30                       rtcpRRHandler, rtcpRRHandlerClientData);

31     }

32   } else { // 使用UDP传输RTP包和RTCP包

33       34     // Tell the RTP and RTCP 'groupsocks' about this destination

35     // (in case they don't already have it):

36     if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);

37     if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);

38     if (fRTCPInstance != NULL) {

39       fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,

40                       rtcpRRHandler, rtcpRRHandlerClientData);

41     }

42   }

43

44   if (fRTCPInstance != NULL) {

45     // Hack: Send an initial RTCP "SR" packet, before the initial RTP packet, so that receivers will (likely) be able to

46     // get RTCP-synchronized presentation times immediately:

47     fRTCPInstance->sendReport();

48   }

49

50   if (!fAreCurrentlyPlaying && fMediaSource != NULL) { //调用RTPSink::startPlaying函数开始传输数据

51     if (fRTPSink != NULL) {

52       fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);

53       fAreCurrentlyPlaying = True;

54     } else if (fUDPSink != NULL) {

55       fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);

56       fAreCurrentlyPlaying = True;

57     }

58   }

59 }

在StreamState::startPlaying函数中,将客户端添加到目的客户端列表中去,然后调用RTPSink::startPlaying函数,实际调用的是MediaSink::startPlaying函数:

1 Boolean MediaSink::startPlaying(MediaSource& source,

2                 afterPlayingFunc* afterFunc,

3                 void* afterClientData) {

4   // Make sure we're not already being played:

5   if (fSource != NULL) {

6     envir().setResultMsg("This sink is already being played");

7     return False;

8   }

9

10   // Make sure our source is compatible:

11   if (!sourceIsCompatibleWithUs(source)) {

12     envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!");

13     return False;

14   }

15   fSource = (FramedSource*)&source;

16

17   fAfterFunc = afterFunc; // 设置好回调函数后,就调用continuePlaying函数开始播放

18   fAfterClientData = afterClientData;

19   return continuePlaying();

20 }

21   /* 这里我们的媒体资源是.264文件,对应的是H264VideoFileServerMediaSubsession,对应H264VideoFileServerMediaSubsession创建的是

H264VideoRTPSink对象,H264VideoRTPSink是H264or5VideoRTPSink的子类,因此上面实际调用的是H264or5VideoRTPSink::continuePlaying函数 */

22 Boolean H264or5VideoRTPSink::continuePlaying() {

23   // First, check whether we have a 'fragmenter' class set up yet.

24   // If not, create it now: 创建一个H264or5Fragmenter对象,

25   if (fOurFragmenter == NULL) {

26     fOurFragmenter = new H264or5Fragmenter(fHNumber, envir(), fSource, OutPacketBuffer::maxSize,

27                        ourMaxPacketSize() - 12/*RTP hdr size*/);

28   } else {

29     fOurFragmenter->reassignInputSource(fSource);

30   }

31   fSource = fOurFragmenter;   // 注意,此处fSource变成了H264or5Fragmenter对象,H264or5Fragmenter是FramedFilter的子类

32   // FramedFilter是FramedSource的子类,从名字可以看出FramedFilter的作用是对FramedSource送来的数据做一些“过滤”,并且FramedFilter的

        结果数据还可以给另外一个FramedFilter做进一步的“过滤”,这里类似于Java中的IO装饰流,使用了装饰模式。

33   // Then call the parent class's implementation:

34   return MultiFramedRTPSink::continuePlaying(); //调用了MultiFramedRTPSink的continuePlaying函数

35 }

1  Boolean MultiFramedRTPSink::continuePlaying() {

2    // Send the first packet.

3    // (This will also schedule any future sends.)

4    buildAndSendPacket(True);

5    return True;

6  }

8  void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket) {

9    fIsFirstPacket = isFirstPacket;

// RTP version 2; marker ('M') bit not set (by default; it can be set later)

10    unsigned rtpHdr = 0x80000000;   Set up the RTP header: /设置RTP头部

11    rtpHdr |= (fRTPPayloadType<<16);

12    rtpHdr |= fSeqNo; // sequence number

13    fOutBuf->enqueueWord(rtpHdr);

14    // Note where the RTP timestamp will go.

15    // (We can't fill this in until we start packing payload frames.)

16    fTimestampPosition = fOutBuf->curPacketSize();

17    fOutBuf->skipBytes(4); // leave a hole for the timestamp

18    fOutBuf->enqueueWord(SSRC());

19    // Allow for a special, payload-format-specific header following the

20    // RTP header:

21    fSpecialHeaderPosition = fOutBuf->curPacketSize();

22    fSpecialHeaderSize = specialHeaderSize();

23    fOutBuf->skipBytes(fSpecialHeaderSize);

24 

25    // Begin packing as many (complete) frames into the packet as we can:

26    fTotalFrameSpecificHeaderSizes = 0;

27    fNoFramesLeft = False;

28    fNumFramesUsedSoFar = 0;

29    packFrame();                      // 调用packFrame函数

30  }

31 

32  void MultiFramedRTPSink::packFrame() {

33    // Get the next frame.

34 

35    // First, see if we have an overflow frame that was too big for the last pkt

36    if (fOutBuf->haveOverflowData()) {

37      // Use this frame before reading a new one from the source

38      unsigned frameSize = fOutBuf->overflowDataSize();

39      struct timeval presentationTime = fOutBuf->overflowPresentationTime();

40      unsigned durationInMicroseconds = fOutBuf->overflowDurationInMicroseconds();

41      fOutBuf->useOverflowData();

42 

43      afterGettingFrame1(frameSize, 0, presentationTime, durationInMicroseconds);

44    } else {

45      // Normal case: we need to read a new frame from the source

46      if (fSource == NULL) return;

47 

48      fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();

49      fCurFrameSpecificHeaderSize = frameSpecificHeaderSize();

50      fOutBuf->skipBytes(fCurFrameSpecificHeaderSize);

51      fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize;

52      fSource->getNextFrame(fOutBuf->curPtr(), fOutBuf->totalBytesAvailable(),  // 调用FramedSource::getNextFrame函数获取帧数据保存到fOutBuf中

53                afterGettingFrame, this, ourHandleClosure, this);       // 获取后回调MultiFramedRTPSink::afterGettingFrame函数

        //对应H264VideoFileServerMediaSubsession,是在H264or5Fragmenter中回调MultiFramedRTPSink::afterGettingFrame函数

54    }

55  }

56

57 void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,

58                 afterGettingFunc* afterGettingFunc,void* afterGettingClientData,

59                 onCloseFunc* onCloseFunc,void* onCloseClientData) {

60   // Make sure we're not already being read:

61   if (fIsCurrentlyAwaitingData) {

62     envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!\n";

63     envir().internalError();

64   }

65

66   fTo = to;

67   fMaxSize = maxSize;

68   fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame()

69   fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()

70   fAfterGettingFunc = afterGettingFunc;

71   fAfterGettingClientData = afterGettingClientData;

72   fOnCloseFunc = onCloseFunc;

73   fOnCloseClientData = onCloseClientData;

74   fIsCurrentlyAwaitingData = True;

75

76   doGetNextFrame(); // 调用doGetNextFrame函数

77 }

对应H264VideoFileServerMediaSubsession之前创建的FramedSource是ByteStreamFileSource对象,然后又在ByteStreamFileSource外面套了一个H264or5Fragmenter,因此调用的是H264or5Fragmenter::doGetNextFrame函数:

1 void H264or5Fragmenter::doGetNextFrame() {

2   if (fNumValidDataBytes == 1) {

3     // We have no NAL unit data currently in the buffer.  Read a new one:

        //在这里又调用了ByteStreamFileSource的doGetNextFrame函数,并且设置回调函数为H264or5Fragmenter::afterGettingFrame函数

4     fInputSource->getNextFrame(&fInputBuffer[1], fInputBufferSize - 1,       

5                    afterGettingFrame, this,

6                    FramedSource::handleClosure, this);

7   } else {

8     // We have NAL unit data in the buffer.  There are three cases to consider:

9     // 1. There is a new NAL unit in the buffer, and it's small enough to deliver

10     //    to the RTP sink (as is).

11     // 2. There is a new NAL unit in the buffer, but it's too large to deliver to

12     //    the RTP sink in its entirety.  Deliver the first fragment of this data,

13     //    as a FU packet, with one extra preceding header byte (for the "FU header").

14     // 3. There is a NAL unit in the buffer, and we've already delivered some

15     //    fragment(s) of this.  Deliver the next fragment of this data,

16     //    as a FU packet, with two (H.264) or three (H.265) extra preceding header bytes

17     //    (for the "NAL header" and the "FU header").

18

19     if (fMaxSize < fMaxOutputPacketSize) { // shouldn't happen

20       envir() << "H264or5Fragmenter::doGetNextFrame(): fMaxSize ("

21           << fMaxSize << ") is smaller than expected\n";

22     } else {

23       fMaxSize = fMaxOutputPacketSize;

24     }

25

26     fLastFragmentCompletedNALUnit = True; // by default

27     if (fCurDataOffset == 1) { // case 1 or 2

28       if (fNumValidDataBytes - 1 <= fMaxSize) { // case 1

29     memmove(fTo, &fInputBuffer[1], fNumValidDataBytes - 1);

30     fFrameSize = fNumValidDataBytes - 1;

31     fCurDataOffset = fNumValidDataBytes;

32       } else { // case 2

33     // We need to send the NAL unit data as FU packets.  Deliver the first

34     // packet now.  Note that we add "NAL header" and "FU header" bytes to the front

35     // of the packet (overwriting the existing "NAL header").

36     if (fHNumber == 264) {

37       fInputBuffer[0] = (fInputBuffer[1] & 0xE0) | 28; // FU indicator

38       fInputBuffer[1] = 0x80 | (fInputBuffer[1] & 0x1F); // FU header (with S bit)

39     } else { // 265

40       u_int8_t nal_unit_type = (fInputBuffer[1]&0x7E)>>1;

41       fInputBuffer[0] = (fInputBuffer[1] & 0x81) | (49<<1); // Payload header (1st byte)

42       fInputBuffer[1] = fInputBuffer[2]; // Payload header (2nd byte)

43       fInputBuffer[2] = 0x80 | nal_unit_type; // FU header (with S bit)

44     }

45     memmove(fTo, fInputBuffer, fMaxSize);

46     fFrameSize = fMaxSize;

47     fCurDataOffset += fMaxSize - 1;

48     fLastFragmentCompletedNALUnit = False;

49       }

50     } else { // case 3

51       // We are sending this NAL unit data as FU packets.  We've already sent the

52       // first packet (fragment).  Now, send the next fragment.  Note that we add

53       // "NAL header" and "FU header" bytes to the front.  (We reuse these bytes that

54       // we already sent for the first fragment, but clear the S bit, and add the E

55       // bit if this is the last fragment.)

56       unsigned numExtraHeaderBytes;

57       if (fHNumber == 264) {

58     fInputBuffer[fCurDataOffset-2] = fInputBuffer[0]; // FU indicator

59     fInputBuffer[fCurDataOffset-1] = fInputBuffer[1]&~0x80; // FU header (no S bit)

60     numExtraHeaderBytes = 2;

61       } else { // 265

62     fInputBuffer[fCurDataOffset-3] = fInputBuffer[0]; // Payload header (1st byte)

63     fInputBuffer[fCurDataOffset-2] = fInputBuffer[1]; // Payload header (2nd byte)

64     fInputBuffer[fCurDataOffset-1] = fInputBuffer[2]&~0x80; // FU header (no S bit)

65     numExtraHeaderBytes = 3;

66       }

67       unsigned numBytesToSend = numExtraHeaderBytes + (fNumValidDataBytes - fCurDataOffset);

68       if (numBytesToSend > fMaxSize) {

69     // We can't send all of the remaining data this time:

70     numBytesToSend = fMaxSize;

71     fLastFragmentCompletedNALUnit = False;

72       } else {

73     // This is the last fragment:

74     fInputBuffer[fCurDataOffset-1] |= 0x40; // set the E bit in the FU header

75     fNumTruncatedBytes = fSaveNumTruncatedBytes;

76       }

77       memmove(fTo, &fInputBuffer[fCurDataOffset-numExtraHeaderBytes], numBytesToSend);

78       fFrameSize = numBytesToSend;

79       fCurDataOffset += numBytesToSend - numExtraHeaderBytes;

80     }

81

82     if (fCurDataOffset >= fNumValidDataBytes) {

83       // We're done with this data.  Reset the pointers for receiving new data:

84       fNumValidDataBytes = fCurDataOffset = 1;

85     }

86

87     // Complete delivery to the client:

88     FramedSource::afterGetting(this);       // 回调MultiFramedRTPSink::afterGettingFrame函数   

89   }

90 }

1 void ByteStreamFileSource::doGetNextFrame() {

2   if (feof(fFid) || ferror(fFid) || (fLimitNumBytesToStream && fNumBytesToStream == 0)) {

3     handleClosure();

4     return;

5   }

6

7 #ifdef READ_FROM_FILES_SYNCHRONOUSLY

8   doReadFromFile(); // 读文件

9 #else

10   if (!fHaveStartedReading) {

11     // Await readable data from the file:

12     envir().taskScheduler().turnOnBackgroundReadHandling(fileno(fFid),

13            (TaskScheduler::BackgroundHandlerProc*)&fileReadableHandler, this);

14     fHaveStartedReading = True;

15   }

16 #endif

17 }

18

19 void ByteStreamFileSource::doReadFromFile() {

20   // Try to read as many bytes as will fit in the buffer provided (or "fPreferredFrameSize" if less)

21   if (fLimitNumBytesToStream && fNumBytesToStream < (u_int64_t)fMaxSize) {

22     fMaxSize = (unsigned)fNumBytesToStream;

23   }

24   if (fPreferredFrameSize > 0 && fPreferredFrameSize < fMaxSize) {

25     fMaxSize = fPreferredFrameSize;

26   }

27 #ifdef READ_FROM_FILES_SYNCHRONOUSLY

28   fFrameSize = fread(fTo, 1, fMaxSize, fFid);        //调用fread函数读取数据

29 #else

30   if (fFidIsSeekable) {

31     fFrameSize = fread(fTo, 1, fMaxSize, fFid);

32   } else {

33     // For non-seekable files (e.g., pipes), call "read()" rather than "fread()", to ensure that the read doesn't block:

34     fFrameSize = read(fileno(fFid), fTo, fMaxSize);

35   }

36 #endif

37   if (fFrameSize == 0) {

38     handleClosure();

39     return;

40   }

41   fNumBytesToStream -= fFrameSize;

42

43   // Set the 'presentation time':

44   if (fPlayTimePerFrame > 0 && fPreferredFrameSize > 0) {

45     if (fPresentationTime.tv_sec == 0 && fPresentationTime.tv_usec == 0) {

46       // This is the first frame, so use the current time:

47       gettimeofday(&fPresentationTime, NULL);

48     } else {

49       // Increment by the play time of the previous data:

50       unsigned uSeconds    = fPresentationTime.tv_usec + fLastPlayTime;

51       fPresentationTime.tv_sec += uSeconds/1000000;

52       fPresentationTime.tv_usec = uSeconds%1000000;

53     }

54

55     // Remember the play time of this data:

56     fLastPlayTime = (fPlayTimePerFrame*fFrameSize)/fPreferredFrameSize;

57     fDurationInMicroseconds = fLastPlayTime;

58   } else {

59     // We don't know a specific play time duration for this data,

60     // so just record the current time as being the 'presentation time':

61     gettimeofday(&fPresentationTime, NULL);

62   }

63  //读取数据后,调用FramedSource::afterGetting函数

64   // Inform the reader that he has data:

65 #ifdef READ_FROM_FILES_SYNCHRONOUSLY

66   // To avoid possible infinite recursion, we need to return to the event loop to do this:

67   nextTask() = envir().taskScheduler().scheduleDelayedTask(0,

68                 (TaskFunc*)FramedSource::afterGetting, this);

69 #else

70   // Because the file read was done from the event loop, we can call the

71   // 'after getting' function directly, without risk of infinite recursion:

72   FramedSource::afterGetting(this);

73 #endif

74 }

75

76 void FramedSource::afterGetting(FramedSource* source) {

77   source->fIsCurrentlyAwaitingData = False;

78       // indicates that we can be read again

79       // Note that this needs to be done here, in case the "fAfterFunc"

80       // called below tries to read another frame (which it usually will)

81   

82   if (source->fAfterGettingFunc != NULL) {

83     (*(source->fAfterGettingFunc))(source->fAfterGettingClientData,

84                    source->fFrameSize, source->fNumTruncatedBytes,

85                    source->fPresentationTime,

86                    source->fDurationInMicroseconds);

87   }

88 }

 

在FramedSource::afterGetting函数中调用fAfterGettingFunc函数,对于ByteStreamFileSource对象,fAfterGettingFunc在之前被设置为H264or5Fragmenter::afterGettingFrame函数:

 

1 void H264or5Fragmenter::afterGettingFrame(void* clientData, unsigned frameSize,

2                       unsigned numTruncatedBytes,

3                       struct timeval presentationTime,

4                       unsigned durationInMicroseconds) {

5   H264or5Fragmenter* fragmenter = (H264or5Fragmenter*)clientData;

6   fragmenter->afterGettingFrame1(frameSize, numTruncatedBytes, presentationTime,

7                  durationInMicroseconds);

8 }

9

10 void H264or5Fragmenter::afterGettingFrame1(unsigned frameSize,

11                        unsigned numTruncatedBytes,

12                        struct timeval presentationTime,

13                        unsigned durationInMicroseconds) {

14   fNumValidDataBytes += frameSize;

15   fSaveNumTruncatedBytes = numTruncatedBytes;

16   fPresentationTime = presentationTime;

17   fDurationInMicroseconds = durationInMicroseconds;

18

19   // Deliver data to the client:

20   doGetNextFrame();                   

21 }

  在H264or5Fragmenter::afterGettingFrame1函数中又调用了H264or5Fragmenter::doGetNextFrame函数,在H264or5Fragmenter::doGetNextFrame函数中,当读取的帧数据满足条件时就又回调MultiFrameRTPSink的afterGettingFrame函数。

  1 void MultiFramedRTPSink

  2 ::afterGettingFrame(void* clientData, unsigned numBytesRead,

  3             unsigned numTruncatedBytes,

  4             struct timeval presentationTime,

  5             unsigned durationInMicroseconds) {

  6   MultiFramedRTPSink* sink = (MultiFramedRTPSink*)clientData;

  7   sink->afterGettingFrame1(numBytesRead, numTruncatedBytes,

  8                presentationTime, durationInMicroseconds);

  9 }

10

11 void MultiFramedRTPSink

12 ::afterGettingFrame1(unsigned frameSize, unsigned numTruncatedBytes,

13              struct timeval presentationTime,

14              unsigned durationInMicroseconds) {

15   if (fIsFirstPacket) {

16     // Record the fact that we're starting to play now:

17     gettimeofday(&fNextSendTime, NULL);

18   }

19

20   fMostRecentPresentationTime = presentationTime;

21   if (fInitialPresentationTime.tv_sec == 0 && fInitialPresentationTime.tv_usec == 0) {

22     fInitialPresentationTime = presentationTime;

23   }   

24

25   if (numTruncatedBytes > 0) { //超出缓冲区大小要被舍弃的数据

26     unsigned const bufferSize = fOutBuf->totalBytesAvailable();

27     envir() << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("

28         << bufferSize << ").  "

29         << numTruncatedBytes << " bytes of trailing data was dropped!  Correct this by increasing \"OutPacketBuffer::maxSize\" to at least "

30         << OutPacketBuffer::maxSize + numTruncatedBytes << ", *before* creating this 'RTPSink'.  (Current value is "

31         << OutPacketBuffer::maxSize << ".)\n";

32   }

33   unsigned curFragmentationOffset = fCurFragmentationOffset;

34   unsigned numFrameBytesToUse = frameSize;

35   unsigned overflowBytes = 0;

36

37   // If we have already packed one or more frames into this packet,

38   // check whether this new frame is eligible to be packed after them.

39   // (This is independent of whether the packet has enough room for this

40   // new frame; that check comes later.)

41   if (fNumFramesUsedSoFar > 0) {                              // 在这个RTP包中已经包含了若干帧数据

42     if ((fPreviousFrameEndedFragmentation

43      && !allowOtherFramesAfterLastFragment())

44     || !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize)) {      //这个RTP包中不允许再添加多余的帧(比如:前面的帧作了结尾标记)

45       // Save away this frame for next time:

46       numFrameBytesToUse = 0;

47       fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize,

48                    presentationTime, durationInMicroseconds);

49     }

50   }

51   fPreviousFrameEndedFragmentation = False;

52

53   if (numFrameBytesToUse > 0) {                   // 允许将这一帧添加到RTP包中,但要检查大小是否超出了RTP包的剩余空间  

54     // Check whether this frame overflows the packet       

55     if (fOutBuf->wouldOverflow(frameSize)) { //这一帧数据超出了RTP包剩余空间

56       // Don't use this frame now; instead, save it as overflow data, and

57       // send it in the next packet instead.  However, if the frame is too

58       // big to fit in a packet by itself, then we need to fragment it (and

59       // use some of it in this packet, if the payload format permits this.)

60       if (isTooBigForAPacket(frameSize)

61           && (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) {      //发送这一帧数据的一部分

62         // We need to fragment this frame, and use some of it now:

63         overflowBytes = computeOverflowForNewFrame(frameSize);

64         numFrameBytesToUse -= overflowBytes;

65         fCurFragmentationOffset += numFrameBytesToUse;

66       } else {

67         // We don't use any of this frame now:                              // 不添加这一帧数据

68         overflowBytes = frameSize;

69         numFrameBytesToUse = 0;

70       }

71       fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse,         // 标记超出帧的位置和大小,以便后面调整packet

72                    overflowBytes, presentationTime, durationInMicroseconds);

73     } else if (fCurFragmentationOffset > 0) {                                 

74       // This is the last fragment of a frame that was fragmented over

75       // more than one packet.  Do any special handling for this case:

76       fCurFragmentationOffset = 0;

77       fPreviousFrameEndedFragmentation = True;

78     }

79   }

80

81   if (numFrameBytesToUse == 0 && frameSize > 0) { // 读取适当的数据后,开始发送RTP包

82     // Send our packet now, because we have filled it up:

83     sendPacketIfNecessary();

84   } else {

85     // Use this frame in our outgoing packet:

86     unsigned char* frameStart = fOutBuf->curPtr();

87     fOutBuf->increment(numFrameBytesToUse);

88         // do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes

89

90     // Here's where any payload format specific processing gets done:

91     doSpecialFrameHandling(curFragmentationOffset, frameStart,

92                numFrameBytesToUse, presentationTime,

93                overflowBytes);

94

95     ++fNumFramesUsedSoFar;

96

97     // Update the time at which the next packet should be sent, based

98     // on the duration of the frame that we just packed into it.

99     // However, if this frame has overflow data remaining, then don't

100     // count its duration yet.

101     if (overflowBytes == 0) {

102       fNextSendTime.tv_usec += durationInMicroseconds;

103       fNextSendTime.tv_sec += fNextSendTime.tv_usec/1000000;

104       fNextSendTime.tv_usec %= 1000000;

105     }

106

107     // Send our packet now if (i) it's already at our preferred size, or

108     // (ii) (heuristic) another frame of the same size as the one we just

109     //      read would overflow the packet, or

110     // (iii) it contains the last fragment of a fragmented frame, and we

111     //      don't allow anything else to follow this or

112     // (iv) one frame per packet is allowed:

113     if (fOutBuf->isPreferredSize()

114         || fOutBuf->wouldOverflow(numFrameBytesToUse)

115         || (fPreviousFrameEndedFragmentation &&

116             !allowOtherFramesAfterLastFragment())

117         || !frameCanAppearAfterPacketStart(fOutBuf->curPtr() - frameSize,

118                        frameSize) ) {

119       // The packet is ready to be sent now

120       sendPacketIfNecessary();

121     } else {

122       // There's room for more frames; try getting another:

123       packFrame(); // 调用packFrame函数读取更多的数据

124     }

125   }

126 }

在MultiFramedRTPSink中尽量读取多的帧之后,调用sendPacketIfNecessary函数发送给客户端:

1 void MultiFramedRTPSink::sendPacketIfNecessary() {

2   if (fNumFramesUsedSoFar > 0) {

3     // Send the packet:

4 #ifdef TEST_LOSS

5     if ((our_random()%10) != 0) // simulate 10% packet loss #####

6 #endif

7       if (!fRTPInterface.sendPacket(fOutBuf->packet(), fOutBuf->curPacketSize())) {      // 通过RTPInterface发送RTP包

8     // if failure handler has been specified, call it

9     if (fOnSendErrorFunc != NULL) (*fOnSendErrorFunc)(fOnSendErrorData);

10       }

11     ++fPacketCount;

12     fTotalOctetCount += fOutBuf->curPacketSize();

13     fOctetCount += fOutBuf->curPacketSize()

14       - rtpHeaderSize - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes;

15

16     ++fSeqNo; // for next time

17   }

18

19   if (fOutBuf->haveOverflowData() //未发送的帧数据,对RTP包作出调整

20       && fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize()/2) {

21     // Efficiency hack: Reset the packet start pointer to just in front of

22     // the overflow data (allowing for the RTP header and special headers),

23     // so that we probably don't have to "memmove()" the overflow data

24     // into place when building the next packet:

25     unsigned newPacketStart = fOutBuf->curPacketSize()

26       - (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize());

27     fOutBuf->adjustPacketStart(newPacketStart);

28   } else {

29     // Normal case: Reset the packet start pointer back to the start:

30     fOutBuf->resetPacketStart();

31   }

32   fOutBuf->resetOffset();

33   fNumFramesUsedSoFar = 0;

34

35   if (fNoFramesLeft) {

36     // We're done:

37     onSourceClosure();

38   } else {

39     // We have more frames left to send.  Figure out when the next frame

40     // is due to start playing, then make sure that we wait this long before

41     // sending the next packet.

42     struct timeval timeNow;

43     gettimeofday(&timeNow, NULL);

44     int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec;

45     int64_t uSecondsToGo = secsDiff*1000000 + (fNextSendTime.tv_usec - timeNow.tv_usec);

46     if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative:

47       uSecondsToGo = 0;

48     }

49

50     // Delay this amount of time:        // 准备下一次发送RTP包

51     nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo, (TaskFunc*)sendNext, this);

52   }

53 }

54

55 // The following is called after each delay between packet sends:

56 void MultiFramedRTPSink::sendNext(void* firstArg) {

57   MultiFramedRTPSink* sink = (MultiFramedRTPSink*)firstArg;

58   sink->buildAndSendPacket(False);                                //循环调用buildAndSendPacket

59 }

以上的调用函数过程比较乱,特附一张图以更清晰地展示以上的流程

wps17

服务器端通过RTPSink去读数据,在RTPSink中又通过FramedSource读数据,读完数据后交给RTPSink处理,RTPSink处理完后继续通过FramedSource读取数据,如此在RTPSink和FramedSoruce之间形成一个循环,这是Live555读取发送数据的总体流程。

以上便是从建立RTSP连接到发送RTP数据的流程(以H264文件为例),后面的停止发送数据到断开连接不再关注和详述。

作者:昨夜星辰

给我留言

留言无头像?