上一篇我们简单分析了testOnDemandRTSPServer.cpp的main函数,主要步骤是创建RTSPServer,创建ServerMediaSession对象,然后等待RTSP客户端的连接。接下来我们分析一下Live555中建立RTSP连接的详细过程,首先我们需要简单了解一下RTSP协议建立连接的过程:
1.(可选)
RTSP客户端 —> RTSP服务器端 OPTIONS命令 询问服务器端有哪些方法可使用
RTSP服务器端 —> RTSP客户端 回复OPTIONS命令 回复客户端服务器支持的方法
2. (可选)
RTSP客户端 —> RTSP服务器端 DESCRIBE命令 请求对某个媒体资源(Live555中用ServerMediaSession表示)的描述信息
RTSP服务器端 —> RTSP客户端 回复DESCRIBE命令 回复客户端某个媒体资源的描述信息(即SDP)
3. (必选)
RTSP客户端 —> RTSP服务器端 SETUP命令 请求建立对某个媒体资源的连接
RTSP服务器端 —> RTSP客户端 回复SETUP命令 回复建立连接的结果
4. (必选)
RTSP客户端 —> RTSP服务器端 PLAY命令 请求播放媒体资源
RTSP服务器端 —> RTSP客户端 回复PLAY命令 回复播放的结果
--------------------RTSP服务器端发送RTP包(封装了数据)给RTSP客户端-------------------------------
下面我们从RTSPServer::incomingConnectionHandlerRTSP函数开始,在incomingConnectionHandlerRTSP函数中又调用了RTSPServer::incomingConnectionHandler函数,在这个函数中accept客户端的TCP连接,然后调用RTSPServer::createNewClientConnection函数创建一个RTSPClientConnection实例,该实例表示一个与客户端的RTSP连接。
1 RTSPServer::RTSPClientConnection
2 ::RTSPClientConnection(RTSPServer& ourServer, int clientSocket, struct sockaddr_in clientAddr)
3 : fOurServer(ourServer), fIsActive(True),
4 fClientInputSocket(clientSocket), fClientOutputSocket(clientSocket), fClientAddr(clientAddr),
5 fRecursionCount(0), fOurSessionCookie(NULL) {
6 // Add ourself to our 'client connections' table: 把这个RTSPClientConnection实例添加到RTSPServer的列表中
7 fOurServer.fClientConnections->Add((char const*)this, this);
8
9 // Arrange to handle incoming requests:
10 resetRequestBuffer();
11 envir().taskScheduler().setBackgroundHandling(fClientInputSocket, SOCKET_READABLE|SOCKET_EXCEPTION,
12 (TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler, this);
13 }
RTSPClientConnection的构造函数中,将自己添加到RTSPServer的连接列表中,然后将客户端socket添加到SOCKET SET中,并且设置相应的回调处理函数incomingRequestHandler,然后就开始等待客户端发送命令。服务器端收到客户端的命令即回调RTSPClientConnection::incomingRequestHandler来处理。
在RTSPClientConnection::incomingRequestHandler函数中又调用RTSPClientConnection::incomingRequestHandler1函数,在这个函数中,从客户端socket中读取数据,读取的数据存储在RTSPClientConnection::fRequestBuffer这个数组中,然后调RTSPClientConnection::handleRequestBytes函数处理刚才读到的数据。handleRequestBytes函数的内容(比较多)主要是分析读取的数据,提取出命令名等数据,然后根据不同的命令调用不同的函数去处理,将处理后的结果保存在fResponseBuffer这个数组中,然后发送给客户端。在此,我们假设客户端跳过OPTINS命令,直接发送DESCRIBE命令请求建立连接,则在handleRequestBytes函数中会调用RTSPClientConnection::handleCmd_DESCRIBE函数来处理,下面来看一下handleCmd_DESCRIBE函数。先说一下urlPreSuffix和urlSuffix吧,假设客户端请求媒体资源的RTSP地址是rtsp://127.0.0.1:8554/test1/test2/test.264,urlPreSuffix表示的是ip:port之后(不含紧跟的“/”)到最后一个“/”之前的部分,即test1/test2,urlSuffix表示的是最后一个“/”之后(不含紧跟的“/”)的内容,即test.264。
1 void RTSPServer::RTSPClientConnection
2 ::handleCmd_DESCRIBE(char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr) {
3 char* sdpDescription = NULL;
4 char* rtspURL = NULL;
5 do {
6 char urlTotalSuffix[RTSP_PARAM_STRING_MAX];
7 if (strlen(urlPreSuffix) + strlen(urlSuffix) + 2 > sizeof urlTotalSuffix) {
8 handleCmd_bad();
9 break;
10 }
11 urlTotalSuffix[0] = '\0'; // 拼接urlPreSuffix和urlSuffix,保存在urlTotalSuffix中
12 if (urlPreSuffix[0] != '\0') {
13 strcat(urlTotalSuffix, urlPreSuffix);
14 strcat(urlTotalSuffix, "/");
15 }
16 strcat(urlTotalSuffix, urlSuffix);
17
18 if (!authenticationOK("DESCRIBE", urlTotalSuffix, fullRequestStr)) break;
19
20 // We should really check that the request contains an "Accept:" #####
21 // for "application/sdp", because that's what we're sending back #####
22
23 // Begin by looking up the "ServerMediaSession" object for the specified "urlTotalSuffix":
24 ServerMediaSession* session = fOurServer.lookupServerMediaSession(urlTotalSuffix); // 在RTSPServer中查找对应的ServerMediaSession
25 if (session == NULL) {
26 handleCmd_notFound();
27 break;
28 }
29
30 // Then, assemble a SDP description for this session:
31 sdpDescription = session->generateSDPDescription(); // 产生SDP描述信息字符串
32 if (sdpDescription == NULL) {
33 // This usually means that a file name that was specified for a
34 // "ServerMediaSubsession" does not exist.
35 setRTSPResponse("404 File Not Found, Or In Incorrect Format");
36 break;
37 }
38 unsigned sdpDescriptionSize = strlen(sdpDescription);
39
40 // Also, generate our RTSP URL, for the "Content-Base:" header
41 // (which is necessary to ensure that the correct URL gets used in subsequent "SETUP" requests).
42 rtspURL = fOurServer.rtspURL(session, fClientInputSocket);
43
44 snprintf((char*)fResponseBuffer, sizeof fResponseBuffer, // 构造回复信息
45 "RTSP/1.0 200 OK\r\nCSeq: %s\r\n"
46 "%s"
47 "Content-Base: %s/\r\n"
48 "Content-Type: application/sdp\r\n"
49 "Content-Length: %d\r\n\r\n"
50 "%s",
51 fCurrentCSeq,
52 dateHeader(),
53 rtspURL,
54 sdpDescriptionSize,
55 sdpDescription);
56 } while (0);
57
58 59
60 delete[] sdpDescription;
61 delete[] rtspURL;
62 }
在handleCmd_DESCRIBE函数中,主要调用了ServerMediaSession::generateSDPDescription函数产生SDP信息,ServerMediaSession的SDP信息由每个ServerMediaSubsession的SDP信息构成,然后将产生的SDP回复给客户端。我们就来看一下generateSDPDescription函数。
1 char* ServerMediaSession::generateSDPDescription() {
2 AddressString ipAddressStr(ourIPAddress(envir()));
3 unsigned ipAddressStrSize = strlen(ipAddressStr.val());
4
5 // For a SSM sessions, we need a "a=source-filter: incl ..." line also:
6 char* sourceFilterLine;
7 if (fIsSSM) {
8 char const* const sourceFilterFmt =
9 "a=source-filter: incl IN IP4 * %s\r\n"
10 "a=rtcp-unicast: reflection\r\n";
11 unsigned const sourceFilterFmtSize = strlen(sourceFilterFmt) + ipAddressStrSize + 1;
12
13 sourceFilterLine = new char[sourceFilterFmtSize];
14 sprintf(sourceFilterLine, sourceFilterFmt, ipAddressStr.val());
15 } else {
16 sourceFilterLine = strDup("");
17 }
18
19 char* rangeLine = NULL; // for now
20 char* sdp = NULL; // for now
21
22 do {
23 // Count the lengths of each subsession's media-level SDP lines.
24 // (We do this first, because the call to "subsession->sdpLines()"
25 // causes correct subsession 'duration()'s to be calculated later.)
//首先调用每个ServerMediaSubsession的sdpLines函数,用来计算sdp的长度
26 unsigned sdpLength = 0;
27 ServerMediaSubsession* subsession;
28 for (subsession = fSubsessionsHead; subsession != NULL;
29 subsession = subsession->fNext) {
30 char const* sdpLines = subsession->sdpLines();
31 if (sdpLines == NULL) continue; // the media's not available
32 sdpLength += strlen(sdpLines);
33 }
34 if (sdpLength == 0) break; // the session has no usable subsessions
35
36 // Unless subsessions have differing durations, we also have a "a=range:" line:
// 计算ServerMediaSession的持续时间,该返回值影响a=range字段,该字段决定了该媒体资源是否可以执行快进、快退、任意进度点播,ServerMediaSession的duration由各个ServerMediaSubsession的duration决定。
// ServerMediaSubsession的duration默认实现是返回0,Live555只对部分格式的媒体文件实现了duration函数,如MKV文件的MatroskaFileServerMediaSubsession分析了mkv文件的播放时长
37 float dur = duration(); 38 if (dur == 0.0) {
39 rangeLine = strDup("a=range:npt=0-\r\n");
40 } else if (dur > 0.0) {
41 char buf[100];
42 sprintf(buf, "a=range:npt=0-%.3f\r\n", dur);
43 rangeLine = strDup(buf);
44 } else { // subsessions have differing durations, so "a=range:" lines go there
45 rangeLine = strDup("");
46 }
47
48 char const* const sdpPrefixFmt =
49 "v=0\r\n"
50 "o=- %ld%06ld %d IN IP4 %s\r\n"
51 "s=%s\r\n"
52 "i=%s\r\n"
53 "t=0 0\r\n"
54 "a=tool:%s%s\r\n"
55 "a=type:broadcast\r\n"
56 "a=control:*\r\n"
57 "%s"
58 "%s"
59 "a=x-qt-text-nam:%s\r\n"
60 "a=x-qt-text-inf:%s\r\n"
61 "%s";
62 sdpLength += strlen(sdpPrefixFmt)
63 + 20 + 6 + 20 + ipAddressStrSize
64 + strlen(fDescriptionSDPString)
65 + strlen(fInfoSDPString)
66 + strlen(libNameStr) + strlen(libVersionStr)
67 + strlen(sourceFilterLine)
68 + strlen(rangeLine)
69 + strlen(fDescriptionSDPString)
70 + strlen(fInfoSDPString)
71 + strlen(fMiscSDPLines);
72 sdpLength += 1000; // in case the length of the "subsession->sdpLines()" calls below change
73 sdp = new char[sdpLength];
74 if (sdp == NULL) break;
75
76 // Generate the SDP prefix (session-level lines):
77 snprintf(sdp, sdpLength, sdpPrefixFmt,
78 fCreationTime.tv_sec, fCreationTime.tv_usec, // o= <session id>
79 1, // o= <version> // (needs to change if params are modified)
80 ipAddressStr.val(), // o= <address>
81 fDescriptionSDPString, // s= <description>
82 fInfoSDPString, // i= <info>
83 libNameStr, libVersionStr, // a=tool:
84 sourceFilterLine, // a=source-filter: incl (if a SSM session)
85 rangeLine, // a=range: line
86 fDescriptionSDPString, // a=x-qt-text-nam: line
87 fInfoSDPString, // a=x-qt-text-inf: line
88 fMiscSDPLines); // miscellaneous session SDP lines (if any)
89
90 // Then, add the (media-level) lines for each subsession:
// 再次调用每个ServerMediaSubsession的sdpLines函数,这次真正将每个ServerMediaSubsession的sdp信息添加到ServerMediaSession的SDP信息中
91 char* mediaSDP = sdp;
92 for (subsession = fSubsessionsHead; subsession != NULL;
93 subsession = subsession->fNext) {
94 unsigned mediaSDPLength = strlen(mediaSDP);
95 mediaSDP += mediaSDPLength; // 指针后移
96 sdpLength -= mediaSDPLength;
97 if (sdpLength <= 1) break; // the SDP has somehow become too long
98
99 char const* sdpLines = subsession->sdpLines();
100 if (sdpLines != NULL) snprintf(mediaSDP, sdpLength, "%s", sdpLines);
101 }
102 } while (0);
103
104 delete[] rangeLine; delete[] sourceFilterLine;
105 return sdp;
106 }
到此,服务器端将客户端请求的SDP信息发送给客户端,然后等着客户端发送下一个命令(SETUP命令),在分析服务器端如何处理SETUP命令之前,我们继续深入看一下服务器端是如何获得SDP信息的。从generateSDPDescription函数中可以看到,主要是调用了每个ServerMediaSubsession的sdpLines函数,默认实现在OnDemandServerMediaSubsession这个类中,下面我们就来看看OnDemandServerMediaSubsession::sdpLines函数。
1 char const* OnDemandServerMediaSubsession::sdpLines() {
2 if (fSDPLines == NULL) {
3 // We need to construct a set of SDP lines that describe this
4 // subsession (as a unicast stream). To do so, we first create
5 // dummy (unused) source and "RTPSink" objects,
6 // whose parameters we use for the SDP lines:
// 这几句话的意思是说,为了获得这个ServerMediaSubsession的sdp信息,我们先创建“虚设的”FramedSource和RTPSink来分析出sdp信息,并非正式的开始播放
7 unsigned estBitrate;
// 创建FramedSource对象,用来获取数据
//(这里实际调用的是子类H264VideoFileServerMediaSubsession的createNewStreamSource函数,创建的是ByteStreamFileSource,ByteStreamFileSource是FramedSource的子类)
8 FramedSource* inputSource = createNewStreamSource(0, estBitrate); // 创建FramedSource对象,获取视频帧数据
9 if (inputSource == NULL) return NULL; // file not found
10
11 struct in_addr dummyAddr;
12 dummyAddr.s_addr = 0;
13 Groupsock dummyGroupsock(envir(), dummyAddr, 0, 0);
14 unsigned char rtpPayloadType = 96 + trackNumber()-1; // if dynamic
// 创建RTPSink对象,用来保存RTP数据包(这里实际调用的是子类H264VideoFileServerMediaSubsession的createNewRTPSink函数,创建的是H264VideoRTPSink对象,H264VideoRTPSink是RTPSink的子类)
15 RTPSink* dummyRTPSink
16 = createNewRTPSink(&dummyGroupsock, rtpPayloadType, inputSource);
17 if (dummyRTPSink != NULL && dummyRTPSink->estimatedBitrate() > 0) estBitrate = dummyRTPSink->estimatedBitrate();
18
19 setSDPLinesFromRTPSink(dummyRTPSink, inputSource, estBitrate); // 通过RTPSink对象获得ServerMediaSubsession的sdp信息
20 Medium::close(dummyRTPSink);
21 closeStreamSource(inputSource);
22 }
23
24 return fSDPLines;
25 }
我们再转到OnDemandServerMediaSubsession::setSDPLinesFromRTPSink函数,在这个函数中,我们通过创建的FramedSource对象和RTPSink对象将文件播放一段以便产生出sdp信息。在此,我要插一下Live555 RTSPServer播放媒体资源的一个大体流程:RTSPServer使用RTPSink获得和保存RTP包,RTPSink不断地向FramedSource请求帧数据,FramedSource取得帧数据后就调用回调函数把数据给RTPSink处理,RTPSink在回调函数中将数据发送给客户端(也可以保存在本地存成文件,即录像的功能)
1 void OnDemandServerMediaSubsession
2 ::setSDPLinesFromRTPSink(RTPSink* rtpSink, FramedSource* inputSource, unsigned estBitrate) {
3 if (rtpSink == NULL) return;
4
//通过RTPSink获取各种关于该ServerMediaSubsession的信息,最主要的是获取auxSDPLine
5 char const* mediaType = rtpSink->sdpMediaType();
6 unsigned char rtpPayloadType = rtpSink->rtpPayloadType();
7 AddressString ipAddressStr(fServerAddressForSDP);
8 char* rtpmapLine = rtpSink->rtpmapLine();
9 char const* rtcpmuxLine = fMultiplexRTCPWithRTP ? "a=rtcp-mux\r\n" : "";
10 char const* rangeLine = rangeSDPLine();
11 char const* auxSDPLine = getAuxSDPLine(rtpSink, inputSource);
12 if (auxSDPLine == NULL) auxSDPLine = "";
13
14 char const* const sdpFmt =
15 "m=%s %u RTP/AVP %d\r\n"
16 "c=IN IP4 %s\r\n"
17 "b=AS:%u\r\n"
18 "%s"
19 "%s"
20 "%s"
21 "%s"
22 "a=control:%s\r\n";
23 unsigned sdpFmtSize = strlen(sdpFmt)
24 + strlen(mediaType) + 5 /* max short len */ + 3 /* max char len */
25 + strlen(ipAddressStr.val())
26 + 20 /* max int len */
27 + strlen(rtpmapLine)
28 + strlen(rtcpmuxLine)
29 + strlen(rangeLine)
30 + strlen(auxSDPLine)
31 + strlen(trackId());
32 char* sdpLines = new char[sdpFmtSize];
33 sprintf(sdpLines, sdpFmt,
34 mediaType, // m= <media>
35 fPortNumForSDP, // m= <port>
36 rtpPayloadType, // m= <fmt list>
37 ipAddressStr.val(), // c= address
38 estBitrate, // b=AS:<bandwidth>
39 rtpmapLine, // a=rtpmap:... (if present)
40 rtcpmuxLine, // a=rtcp-mux:... (if present)
41 rangeLine, // a=range:... (if present)
42 auxSDPLine, // optional extra SDP line
43 trackId()); // a=control:<track-id>
44 delete[] (char*)rangeLine; delete[] rtpmapLine;
45
46 fSDPLines = strDup(sdpLines);
47 delete[] sdpLines;
48 }
在setSDPLinesFromRTPSink函数中通过RTPSink对象获得各种信息,最复杂的是获取auxSDPLine的过程,这个函数在H264VideoFileServerMediaSubsession类中被重写了,由于我们现在分析的媒体资源是.264文件,所以我们来看一下H264VideoFileServerMediaSubsession::getAuxSDPLine函数:
1 char const* H264VideoFileServerMediaSubsession::getAuxSDPLine(RTPSink* rtpSink, FramedSource* inputSource) {
2 if (fAuxSDPLine != NULL) return fAuxSDPLine; // it's already been set up (for a previous client)
3
4 if (fDummyRTPSink == NULL) { // we're not already setting it up for another, concurrent stream
5 // Note: For H264 video files, the 'config' information ("profile-level-id" and "sprop-parameter-sets") isn't known
6 // until we start reading the file. This means that "rtpSink"s "auxSDPLine()" will be NULL initially,
7 // and we need to start reading data from our file until this changes.
8 fDummyRTPSink = rtpSink;
9
10 // Start reading the file: //调用RTPSink的startPlaying函数来播放,对于文件型的ServerMediaSubsession,Live555的做法是播放一段文件来获取sdp信息
11 fDummyRTPSink->startPlaying(*inputSource, afterPlayingDummy, this);
12
13 // Check whether the sink's 'auxSDPLine()' is ready:
14 checkForAuxSDPLine(this);
15 }
16
17 envir().taskScheduler().doEventLoop(&fDoneFlag); // fDoneFlag初始值为NULL,让程序在此循环等待,直到成功分析出sdp信息
18
19 return fAuxSDPLine;
20 }
在这个函数中调用RTPSink的startPlaying函数开始读取数据,调用H264VideoFileServerMediaSubsession::checkForAuxSDPLine函数来检查是否已经从读取的数据中分析出sdp信息,看一下checkForAuxSDPLine函数:
1 static void checkForAuxSDPLine(void* clientData) {
2 H264VideoFileServerMediaSubsession* subsess = (H264VideoFileServerMediaSubsession*)clientData;
3 subsess->checkForAuxSDPLine1();
4 }
5
6 void H264VideoFileServerMediaSubsession::checkForAuxSDPLine1() {
7 char const* dasl;
8
9 if (fAuxSDPLine != NULL) { //说明已经分析出了sdp信息
10 // Signal the event loop that we're done:
11 setDoneFlag(); // 使程序退出循环等待
12 } // 还没分析出sdp信息,调用RTPSink的auxSDPLine函数分析sdp信息
else if (fDummyRTPSink != NULL && (dasl = fDummyRTPSink->auxSDPLine()) != NULL) {
13 fAuxSDPLine = strDup(dasl);
14 fDummyRTPSink = NULL;
15
16 // Signal the event loop that we're done:
17 setDoneFlag(); // 分析出了sdp信息,使程序退出循环等待
18 } else if (!fDoneFlag) // 仍然没有分析出sdp信息,则稍后一会儿再执行checkForAuxSDPLine函数
19 // try again after a brief delay:
20 int uSecsToDelay = 100000; // 100 ms
21 nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecsToDelay,
22 (TaskFunc*)checkForAuxSDPLine, this);
23 }
24 }
上面检查发现没有分析出sdp信息后,调用H264VideoRTPSink::auxSDPLine函数再次试图分析出sdp信息,看看auxSDPLine函数:
1 char const* H264VideoRTPSink::auxSDPLine() {
2 // Generate a new "a=fmtp:" line each time, using our SPS and PPS (if we have them),
3 // otherwise parameters from our framer source (in case they've changed since the last time that
4 // we were called):
5 H264or5VideoStreamFramer* framerSource = NULL;
6 u_int8_t* vpsDummy = NULL; unsigned vpsDummySize = 0;
7 u_int8_t* sps = fSPS; unsigned spsSize = fSPSSize;
8 u_int8_t* pps = fPPS; unsigned ppsSize = fPPSSize;
9 if (sps == NULL || pps == NULL) {
10 // We need to get SPS and PPS from our framer source:
// fOurFragmenter在调用startPlaying函数后被创建
11 if (fOurFragmenter == NULL) return NULL; // we don't yet have a fragmenter (and therefore not a source)
12 framerSource = (H264or5VideoStreamFramer*)(fOurFragmenter->inputSource());
13 if (framerSource == NULL) return NULL; // we don't yet have a source
14
15 framerSource->getVPSandSPSandPPS(vpsDummy, vpsDummySize, sps, spsSize, pps, ppsSize);
16 if (sps == NULL || pps == NULL) return NULL; // our source isn't ready
17 }
18
// 已经从文件里面成功读出了数据,接下来就分析sdp信息
19 // Set up the "a=fmtp:" SDP line for this stream:
20 u_int8_t* spsWEB = new u_int8_t[spsSize]; // "WEB" means "Without Emulation Bytes"
21 unsigned spsWEBSize = removeH264or5EmulationBytes(spsWEB, spsSize, sps, spsSize);
22 if (spsWEBSize < 4) { // Bad SPS size => assume our source isn't ready
23 delete[] spsWEB;
24 return NULL;
25 }
26 u_int32_t profileLevelId = (spsWEB[1]<<16) | (spsWEB[2]<<8) | spsWEB[3];
27 delete[] spsWEB;
28
29 char* sps_base64 = base64Encode((char*)sps, spsSize);
30 char* pps_base64 = base64Encode((char*)pps, ppsSize);
31
32 char const* fmtpFmt =
33 "a=fmtp:%d packetization-mode=1"
34 ";profile-level-id=%06X"
35 ";sprop-parameter-sets=%s,%s\r\n";
36 unsigned fmtpFmtSize = strlen(fmtpFmt)
37 + 3 /* max char len */
38 + 6 /* 3 bytes in hex */
39 + strlen(sps_base64) + strlen(pps_base64);
40 char* fmtp = new char[fmtpFmtSize];
41 sprintf(fmtp, fmtpFmt,
42 rtpPayloadType(),
43 profileLevelId,
44 sps_base64, pps_base64);
45
46 delete[] sps_base64;
47 delete[] pps_base64;
48
49 delete[] fFmtpSDPLine; fFmtpSDPLine = fmtp;
50 return fFmtpSDPLine;
51 }
至此,RTSPServer成功地处理了客户端发送来的DESCRIBE命令,将SDP信息回复客户端。然后,客户端对应每个ServerMediaSubsession发送一个SETUP命令请求建立与该ServerMediaSubsession的连接,服务器端收到后会调用RTSPClientSession::handleCmd_SETUP函数来处理SETUP命令。RTSPClientSession类是服务器端用来维护和客户端的一个会话,SETUP命令、PLAY命令、PAUSE命令、TEARDOWN命令等都是在RTSPClientSession中处理的,RTSPClientSession是RTSPClientConnection的内部类,来看一下这个类:
1 // The state of an individual client session (using one or more sequential TCP connections) handled by a RTSP server:
2 class RTSPClientSession {
3 protected:
4 RTSPClientSession(RTSPServer& ourServer, u_int32_t sessionId);
5 virtual ~RTSPClientSession();
6
7 friend class RTSPServer;
8 friend class RTSPClientConnection;
9 // Make the handler functions for each command virtual, to allow subclasses to redefine them:
10 virtual void handleCmd_SETUP(RTSPClientConnection* ourClientConnection, // 处理SETUP命令
11 char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr);
12 virtual void handleCmd_withinSession(RTSPClientConnection* ourClientConnection,
13 char const* cmdName,
14 char const* urlPreSuffix, char const* urlSuffix,
15 char const* fullRequestStr);
16 virtual void handleCmd_TEARDOWN(RTSPClientConnection* ourClientConnection, // 处理TEARDOWN命令(结束会话)
17 ServerMediaSubsession* subsession);
18 virtual void handleCmd_PLAY(RTSPClientConnection* ourClientConnection, // 处理PLAY命令
19 ServerMediaSubsession* subsession, char const* fullRequestStr);
20 virtual void handleCmd_PAUSE(RTSPClientConnection* ourClientConnection, // 处理PAUSE命令
21 ServerMediaSubsession* subsession);
22 virtual void handleCmd_GET_PARAMETER(RTSPClientConnection* ourClientConnection,
23 ServerMediaSubsession* subsession, char const* fullRequestStr);
24 virtual void handleCmd_SET_PARAMETER(RTSPClientConnection* ourClientConnection,
25 ServerMediaSubsession* subsession, char const* fullRequestStr);
26 protected:
27 UsageEnvironment& envir() { return fOurServer.envir(); }
28 void reclaimStreamStates();
29 Boolean isMulticast() const { return fIsMulticast; }
30 void noteLiveness();
31 static void noteClientLiveness(RTSPClientSession* clientSession); // 与客户端的心跳
32 static void livenessTimeoutTask(RTSPClientSession* clientSession);
33
34 // Shortcuts for setting up a RTSP response (prior to sending it):
35 void setRTSPResponse(RTSPClientConnection* ourClientConnection, char const* responseStr) { ourClientConnection->setRTSPResponse(responseStr); }
36 void setRTSPResponse(RTSPClientConnection* ourClientConnection, char const* responseStr, u_int32_t sessionId) { ourClientConnection->setRTSPResponse(responseStr, sessionId); }
37 void setRTSPResponse(RTSPClientConnection* ourClientConnection, char const* responseStr, char const* contentStr) { ourClientConnection->setRTSPResponse(responseStr, contentStr); }
38 void setRTSPResponse(RTSPClientConnection* ourClientConnection, char const* responseStr, u_int32_t sessionId, char const* contentStr) { ourClientConnection->setRTSPResponse(responseStr, sessionId, contentStr); }
39
40 protected:
41 RTSPServer& fOurServer;
42 u_int32_t fOurSessionId;
43 ServerMediaSession* fOurServerMediaSession;
44 Boolean fIsMulticast, fStreamAfterSETUP;
45 unsigned char fTCPStreamIdCount; // used for (optional) RTP/TCP
46 Boolean usesTCPTransport() const { return fTCPStreamIdCount > 0; }
47 TaskToken fLivenessCheckTask;
48 unsigned fNumStreamStates; // streamState对象的数目
49 struct streamState { // streamState结构体,保存一个请求的ServerMediaSubsession以及对应的StreamState对象
50 ServerMediaSubsession* subsession;
51 void* streamToken; // streamToken指向一个StreamState对象
52 } * fStreamStates; // fStreamStates是streamState数组
53 };
54
/* StreamState类从名字上就可以看出是服务器端用来保存对某个ServerMediaSubsession的流化的状态(包括serverRTPPort、serverRTCPPort、rtpSink、mediaSource等)
当某个ServerMediaSubsession被客户端请求SETUP时,服务器端会创建一个StreamState对象,并创建相关的服务器端socket、RTPSink、FramedSource为后面的播放做好准备,
在创建一个ServerMediaSubsession对象时(详情见testOnDemandRTSPServer.cpp的main函数),会传入reuseFirstSource这个参数。如果reuseFirstSource为true,
则表示对于请求该ServerMediaSubsession的所有客户端都使用同一个StreamState对象,即服务器端使用同一个RTP端口、RTCP端口、RTPSink、FramedSource来为请求该
ServerMediaSubsession的多个客户端服务(一对多,节省服务器端资源);而如果reuseFirstSource为false,则服务器端为每个对ServerMediaSubsession的请求创建一个StreamState对象(多对多,需要占用服务器端较多资源) */
55 class StreamState { // StreamState类,表示服务器端对一个ServerMediaSubsession的一次流化,并保存相关状态
56 public:
57 StreamState(OnDemandServerMediaSubsession& master,
58 Port const& serverRTPPort, Port const& serverRTCPPort,
59 RTPSink* rtpSink, BasicUDPSink* udpSink,
60 unsigned totalBW, FramedSource* mediaSource,
61 Groupsock* rtpGS, Groupsock* rtcpGS);
62 virtual ~StreamState();
63
64 void startPlaying(Destinations* destinations, // 开始播放,服务器端在收到PLAY命令后,就是调用各个StreamState的startPlaying函数来开始播放一个ServerMediaSubsession
65 TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,
66 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
67 void* serverRequestAlternativeByteHandlerClientData);
68 void pause();
69 void endPlaying(Destinations* destinations); // 结束播放
70 void reclaim();
71
72 unsigned& referenceCount() { return fReferenceCount; } // 引用该路流的客户端数目
73
74 Port const& serverRTPPort() const { return fServerRTPPort; }
75 Port const& serverRTCPPort() const { return fServerRTCPPort; }
76
77 RTPSink* rtpSink() const { return fRTPSink; }
78
79 float streamDuration() const { return fStreamDuration; }
80
81 FramedSource* mediaSource() const { return fMediaSource; }
82 float& startNPT() { return fStartNPT; }
83
84 private:
85 OnDemandServerMediaSubsession& fMaster;
86 Boolean fAreCurrentlyPlaying;
87 unsigned fReferenceCount;
88
89 Port fServerRTPPort, fServerRTCPPort;
90
91 RTPSink* fRTPSink;
92 BasicUDPSink* fUDPSink;
93
94 float fStreamDuration;
95 unsigned fTotalBW;
96 RTCPInstance* fRTCPInstance;
97
98 FramedSource* fMediaSource;
99 float fStartNPT; // initial 'normal play time'; reset after each seek
100
101 Groupsock* fRTPgs;
102 Groupsock* fRTCPgs;
103 };
接下来,在handleCmd_SETUP函数中,服务器首先找到客户端请求的ServerMediaSession,再找到客户端请求的ServerMediaSubsession,然后从客户端的请求中获取一些客户端参数(如:客户端的RTP端口、RTCP端口),最后调用OnDemandServerMediaSubsession::getStreamParameters函数创建RTP连接和RTCP连接。看一下getStreamParameters函数:
1 void OnDemandServerMediaSubsession
2 ::getStreamParameters(unsigned clientSessionId,netAddressBits clientAddress,Port const& clientRTPPort,
3 Port const& clientRTCPPort,int tcpSocketNum,unsigned char rtpChannelId,
4 unsigned char rtcpChannelId,netAddressBits& destinationAddress,u_int8_t& /*destinationTTL*/,
5 Boolean& isMulticast,Port& serverRTPPort,Port& serverRTCPPort,void*& streamToken) {
6 if (destinationAddress == 0) destinationAddress = clientAddress;
7 struct in_addr destinationAddr; destinationAddr.s_addr = destinationAddress;
8 isMulticast = False;
9
10 if (fLastStreamToken != NULL && fReuseFirstSource) { // 如果fReuseFirstSource为true,则使用之前已经创建的StreamState对象
11 // Special case: Rather than creating a new 'StreamState',
12 // we reuse the one that we've already created:
13 serverRTPPort = ((StreamState*)fLastStreamToken)->serverRTPPort();
14 serverRTCPPort = ((StreamState*)fLastStreamToken)->serverRTCPPort();
15 ++((StreamState*)fLastStreamToken)->referenceCount();
16 streamToken = fLastStreamToken;
17 } else { // 对于该ServerMediaSubsession尚未创建StreamState对象,或者fReuseFirstSource为false
18 // Normal case: Create a new media source:
19 unsigned streamBitrate;
20 FramedSource* mediaSource // 创建FramedSource对象,实际调用的是子类的createNewStreamSource函数,对应H264VideoFileServerMediaSubsession创建的是ByteStreamFileSource
21 = createNewStreamSource(clientSessionId, streamBitrate);
22
23 // Create 'groupsock' and 'sink' objects for the destination,
24 // using previously unused server port numbers:
25 RTPSink* rtpSink = NULL;
26 BasicUDPSink* udpSink = NULL;
27 Groupsock* rtpGroupsock = NULL;
28 Groupsock* rtcpGroupsock = NULL;
29
30 if (clientRTPPort.num() != 0 || tcpSocketNum >= 0) { // Normal case: Create destinations
31 portNumBits serverPortNum;
32 if (clientRTCPPort.num() == 0) {
33 // We're streaming raw UDP (not RTP). Create a single groupsock:
34 NoReuse dummy(envir()); // ensures that we skip over ports that are already in use
35 for (serverPortNum = fInitialPortNum; ; ++serverPortNum) {
36 struct in_addr dummyAddr; dummyAddr.s_addr = 0;
37
38 serverRTPPort = serverPortNum;
39 rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
40 if (rtpGroupsock->socketNum() >= 0) break; // success
41 }
42
43 udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock);
44 } else { // 创建两个服务器端socket用来传输RTP包和RTCP包,对应rtpGroupsock和rtcpGroupsock
45 // Normal case: We're streaming RTP (over UDP or TCP). Create a pair of
46 // groupsocks (RTP and RTCP), with adjacent port numbers (RTP port number even).
47 // (If we're multiplexing RTCP and RTP over the same port number, it can be odd or even.)
48 NoReuse dummy(envir()); // ensures that we skip over ports that are already in use
49 for (portNumBits serverPortNum = fInitialPortNum; ; ++serverPortNum) {
50 struct in_addr dummyAddr; dummyAddr.s_addr = 0;
51
52 serverRTPPort = serverPortNum;
53 rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
54 if (rtpGroupsock->socketNum() < 0) {
55 delete rtpGroupsock;
56 continue; // try again
57 }
58
59 if (fMultiplexRTCPWithRTP) {
60 // Use the RTP 'groupsock' object for RTCP as well:
61 serverRTCPPort = serverRTPPort;
62 rtcpGroupsock = rtpGroupsock;
63 } else {
64 // Create a separate 'groupsock' object (with the next (odd) port number) for RTCP:
65 serverRTCPPort = ++serverPortNum;
66 rtcpGroupsock = new Groupsock(envir(), dummyAddr, serverRTCPPort, 255);
67 if (rtcpGroupsock->socketNum() < 0) {
68 delete rtpGroupsock;
69 delete rtcpGroupsock;
70 continue; // try again
71 }
72 }
73
74 break; // success
75 }
76
77 unsigned char rtpPayloadType = 96 + trackNumber()-1; // if dynamic
//创建RTPSink,实际调用的是子类的createNewRTPSink函数,对应H264VideoFileServerMediaSubsession创建的是H264VideoRTPSink
78 rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
79 if (rtpSink != NULL && rtpSink->estimatedBitrate() > 0) streamBitrate = rtpSink->estimatedBitrate(); 80 }
81
82 // Turn off the destinations for each groupsock. They'll get set later
83 // (unless TCP is used instead):
84
85 if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations();
86 if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations();
87
88 if (rtpGroupsock != NULL) {
89 // Try to use a big send buffer for RTP - at least 0.1 second of
90 // specified bandwidth and at least 50 KB
91
92 unsigned rtpBufSize = streamBitrate * 25 / 2; // 1 kbps * 0.1 s = 12.5 bytes
93 if (rtpBufSize < 50 * 1024) rtpBufSize = 50 * 1024;
94 increaseSendBufferTo(envir(), rtpGroupsock->socketNum(), rtpBufSize);
95 }
96 }
97
98 // Set up the state of the stream. The stream will get started later:
99 streamToken = fLastStreamToken
100 = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
101 streamBitrate, mediaSource,
102 rtpGroupsock, rtcpGroupsock);
103 }
104
105 // Record these destinations as being for this client session id:
106 Destinations* destinations;
107 if (tcpSocketNum < 0) { // UDP
108 destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort);
109 } else { // TCP
110 destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
111 }
112 fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
113 }
复制代码
经过上面的步骤后,服务器端就已经准备好向客户端传送RTP包以及RTCP包了,等待客户端发送PLAY命令后开始传输。服务器端收到PLAY命令后,调用RTSPClientSession::handleCmd_PLAY函数处理。在handleCmd_PLAY函数中,首先提取Scale,表示客户端期望的播放速度(正常、快进、快退),然后提取Range,表示客户端期望的播放起止范围,根据这两个参数分别调用ServerMediaSubsession::setStreamScale函数和ServerMediaSubsession::seekStream函数,最后调用ServerMediaSubsession::startStream函数开始传输数据。实际调用的是OnDemandServerMediaSubsession::startStream函数,看一下这个函数的内容:
复制代码
1 void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,void* streamToken,
2 TaskFunc* rtcpRRHandler,void* rtcpRRHandlerClientData,
3 unsigned short& rtpSeqNum,unsigned& rtpTimestamp,
4 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
5 void* serverRequestAlternativeByteHandlerClientData) {
6 StreamState* streamState = (StreamState*)streamToken;
7 Destinations* destinations
8 = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId)); // 查找目的客户端的地址
9 if (streamState != NULL) {
10 streamState->startPlaying(destinations,
11 rtcpRRHandler, rtcpRRHandlerClientData,
12 serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData); //调用StreamState::startPlaying函数开始播放
13
14 RTPSink* rtpSink = streamState->rtpSink(); // alias
15 if (rtpSink != NULL) {
16 rtpSeqNum = rtpSink->currentSeqNo();
17 rtpTimestamp = rtpSink->presetNextTimestamp();
18 }
19 }
20 }
在OnDemandServerMediaSubsessionstartStream函数中,主要是调用了StreamState::startPlaying函数,来看一下这个函数:
1 void StreamState
2 ::startPlaying(Destinations* dests,
3 TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,
4 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
5 void* serverRequestAlternativeByteHandlerClientData) {
6 if (dests == NULL) return;
7
8 if (fRTCPInstance == NULL && fRTPSink != NULL) {
9 // Create (and start) a 'RTCP instance' for this RTP sink:
10 fRTCPInstance
11 = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,
12 fTotalBW, (unsigned char*)fMaster.fCNAME,
13 fRTPSink, NULL /* we're a server */);
14 // Note: This starts RTCP running automatically
15 }
16
17 if (dests->isTCP) { // 使用TCP传输RTP包和RTCP包
18 19 // Change RTP and RTCP to use the TCP socket instead of UDP:
20 if (fRTPSink != NULL) {
21 fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
22 RTPInterface
23 ::setServerRequestAlternativeByteHandler(fRTPSink->envir(), dests->tcpSocketNum,
24 serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
25 // So that we continue to handle RTSP commands from the client
26 }
27 if (fRTCPInstance != NULL) {
28 fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
29 fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
30 rtcpRRHandler, rtcpRRHandlerClientData);
31 }
32 } else { // 使用UDP传输RTP包和RTCP包
33 34 // Tell the RTP and RTCP 'groupsocks' about this destination
35 // (in case they don't already have it):
36 if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);
37 if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);
38 if (fRTCPInstance != NULL) {
39 fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
40 rtcpRRHandler, rtcpRRHandlerClientData);
41 }
42 }
43
44 if (fRTCPInstance != NULL) {
45 // Hack: Send an initial RTCP "SR" packet, before the initial RTP packet, so that receivers will (likely) be able to
46 // get RTCP-synchronized presentation times immediately:
47 fRTCPInstance->sendReport();
48 }
49
50 if (!fAreCurrentlyPlaying && fMediaSource != NULL) { //调用RTPSink::startPlaying函数开始传输数据
51 if (fRTPSink != NULL) {
52 fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
53 fAreCurrentlyPlaying = True;
54 } else if (fUDPSink != NULL) {
55 fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
56 fAreCurrentlyPlaying = True;
57 }
58 }
59 }
在StreamState::startPlaying函数中,将客户端添加到目的客户端列表中去,然后调用RTPSink::startPlaying函数,实际调用的是MediaSink::startPlaying函数:
1 Boolean MediaSink::startPlaying(MediaSource& source,
2 afterPlayingFunc* afterFunc,
3 void* afterClientData) {
4 // Make sure we're not already being played:
5 if (fSource != NULL) {
6 envir().setResultMsg("This sink is already being played");
7 return False;
8 }
9
10 // Make sure our source is compatible:
11 if (!sourceIsCompatibleWithUs(source)) {
12 envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!");
13 return False;
14 }
15 fSource = (FramedSource*)&source;
16
17 fAfterFunc = afterFunc; // 设置好回调函数后,就调用continuePlaying函数开始播放
18 fAfterClientData = afterClientData;
19 return continuePlaying();
20 }
21 /* 这里我们的媒体资源是.264文件,对应的是H264VideoFileServerMediaSubsession,对应H264VideoFileServerMediaSubsession创建的是
H264VideoRTPSink对象,H264VideoRTPSink是H264or5VideoRTPSink的子类,因此上面实际调用的是H264or5VideoRTPSink::continuePlaying函数 */
22 Boolean H264or5VideoRTPSink::continuePlaying() {
23 // First, check whether we have a 'fragmenter' class set up yet.
24 // If not, create it now: 创建一个H264or5Fragmenter对象,
25 if (fOurFragmenter == NULL) {
26 fOurFragmenter = new H264or5Fragmenter(fHNumber, envir(), fSource, OutPacketBuffer::maxSize,
27 ourMaxPacketSize() - 12/*RTP hdr size*/);
28 } else {
29 fOurFragmenter->reassignInputSource(fSource);
30 }
31 fSource = fOurFragmenter; // 注意,此处fSource变成了H264or5Fragmenter对象,H264or5Fragmenter是FramedFilter的子类
32 // FramedFilter是FramedSource的子类,从名字可以看出FramedFilter的作用是对FramedSource送来的数据做一些“过滤”,并且FramedFilter的
结果数据还可以给另外一个FramedFilter做进一步的“过滤”,这里类似于Java中的IO装饰流,使用了装饰模式。
33 // Then call the parent class's implementation:
34 return MultiFramedRTPSink::continuePlaying(); //调用了MultiFramedRTPSink的continuePlaying函数
35 }
1 Boolean MultiFramedRTPSink::continuePlaying() {
2 // Send the first packet.
3 // (This will also schedule any future sends.)
4 buildAndSendPacket(True);
5 return True;
6 }
7
8 void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket) {
9 fIsFirstPacket = isFirstPacket;
// RTP version 2; marker ('M') bit not set (by default; it can be set later)
10 unsigned rtpHdr = 0x80000000; Set up the RTP header: /设置RTP头部
11 rtpHdr |= (fRTPPayloadType<<16);
12 rtpHdr |= fSeqNo; // sequence number
13 fOutBuf->enqueueWord(rtpHdr);
14 // Note where the RTP timestamp will go.
15 // (We can't fill this in until we start packing payload frames.)
16 fTimestampPosition = fOutBuf->curPacketSize();
17 fOutBuf->skipBytes(4); // leave a hole for the timestamp
18 fOutBuf->enqueueWord(SSRC());
19 // Allow for a special, payload-format-specific header following the
20 // RTP header:
21 fSpecialHeaderPosition = fOutBuf->curPacketSize();
22 fSpecialHeaderSize = specialHeaderSize();
23 fOutBuf->skipBytes(fSpecialHeaderSize);
24
25 // Begin packing as many (complete) frames into the packet as we can:
26 fTotalFrameSpecificHeaderSizes = 0;
27 fNoFramesLeft = False;
28 fNumFramesUsedSoFar = 0;
29 packFrame(); // 调用packFrame函数
30 }
31
32 void MultiFramedRTPSink::packFrame() {
33 // Get the next frame.
34
35 // First, see if we have an overflow frame that was too big for the last pkt
36 if (fOutBuf->haveOverflowData()) {
37 // Use this frame before reading a new one from the source
38 unsigned frameSize = fOutBuf->overflowDataSize();
39 struct timeval presentationTime = fOutBuf->overflowPresentationTime();
40 unsigned durationInMicroseconds = fOutBuf->overflowDurationInMicroseconds();
41 fOutBuf->useOverflowData();
42
43 afterGettingFrame1(frameSize, 0, presentationTime, durationInMicroseconds);
44 } else {
45 // Normal case: we need to read a new frame from the source
46 if (fSource == NULL) return;
47
48 fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();
49 fCurFrameSpecificHeaderSize = frameSpecificHeaderSize();
50 fOutBuf->skipBytes(fCurFrameSpecificHeaderSize);
51 fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize;
52 fSource->getNextFrame(fOutBuf->curPtr(), fOutBuf->totalBytesAvailable(), // 调用FramedSource::getNextFrame函数获取帧数据保存到fOutBuf中
53 afterGettingFrame, this, ourHandleClosure, this); // 获取后回调MultiFramedRTPSink::afterGettingFrame函数
//对应H264VideoFileServerMediaSubsession,是在H264or5Fragmenter中回调MultiFramedRTPSink::afterGettingFrame函数
54 }
55 }
56
57 void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,
58 afterGettingFunc* afterGettingFunc,void* afterGettingClientData,
59 onCloseFunc* onCloseFunc,void* onCloseClientData) {
60 // Make sure we're not already being read:
61 if (fIsCurrentlyAwaitingData) {
62 envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!\n";
63 envir().internalError();
64 }
65
66 fTo = to;
67 fMaxSize = maxSize;
68 fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame()
69 fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()
70 fAfterGettingFunc = afterGettingFunc;
71 fAfterGettingClientData = afterGettingClientData;
72 fOnCloseFunc = onCloseFunc;
73 fOnCloseClientData = onCloseClientData;
74 fIsCurrentlyAwaitingData = True;
75
76 doGetNextFrame(); // 调用doGetNextFrame函数
77 }
对应H264VideoFileServerMediaSubsession之前创建的FramedSource是ByteStreamFileSource对象,然后又在ByteStreamFileSource外面套了一个H264or5Fragmenter,因此调用的是H264or5Fragmenter::doGetNextFrame函数:
1 void H264or5Fragmenter::doGetNextFrame() {
2 if (fNumValidDataBytes == 1) {
3 // We have no NAL unit data currently in the buffer. Read a new one:
//在这里又调用了ByteStreamFileSource的doGetNextFrame函数,并且设置回调函数为H264or5Fragmenter::afterGettingFrame函数
4 fInputSource->getNextFrame(&fInputBuffer[1], fInputBufferSize - 1,
5 afterGettingFrame, this,
6 FramedSource::handleClosure, this);
7 } else {
8 // We have NAL unit data in the buffer. There are three cases to consider:
9 // 1. There is a new NAL unit in the buffer, and it's small enough to deliver
10 // to the RTP sink (as is).
11 // 2. There is a new NAL unit in the buffer, but it's too large to deliver to
12 // the RTP sink in its entirety. Deliver the first fragment of this data,
13 // as a FU packet, with one extra preceding header byte (for the "FU header").
14 // 3. There is a NAL unit in the buffer, and we've already delivered some
15 // fragment(s) of this. Deliver the next fragment of this data,
16 // as a FU packet, with two (H.264) or three (H.265) extra preceding header bytes
17 // (for the "NAL header" and the "FU header").
18
19 if (fMaxSize < fMaxOutputPacketSize) { // shouldn't happen
20 envir() << "H264or5Fragmenter::doGetNextFrame(): fMaxSize ("
21 << fMaxSize << ") is smaller than expected\n";
22 } else {
23 fMaxSize = fMaxOutputPacketSize;
24 }
25
26 fLastFragmentCompletedNALUnit = True; // by default
27 if (fCurDataOffset == 1) { // case 1 or 2
28 if (fNumValidDataBytes - 1 <= fMaxSize) { // case 1
29 memmove(fTo, &fInputBuffer[1], fNumValidDataBytes - 1);
30 fFrameSize = fNumValidDataBytes - 1;
31 fCurDataOffset = fNumValidDataBytes;
32 } else { // case 2
33 // We need to send the NAL unit data as FU packets. Deliver the first
34 // packet now. Note that we add "NAL header" and "FU header" bytes to the front
35 // of the packet (overwriting the existing "NAL header").
36 if (fHNumber == 264) {
37 fInputBuffer[0] = (fInputBuffer[1] & 0xE0) | 28; // FU indicator
38 fInputBuffer[1] = 0x80 | (fInputBuffer[1] & 0x1F); // FU header (with S bit)
39 } else { // 265
40 u_int8_t nal_unit_type = (fInputBuffer[1]&0x7E)>>1;
41 fInputBuffer[0] = (fInputBuffer[1] & 0x81) | (49<<1); // Payload header (1st byte)
42 fInputBuffer[1] = fInputBuffer[2]; // Payload header (2nd byte)
43 fInputBuffer[2] = 0x80 | nal_unit_type; // FU header (with S bit)
44 }
45 memmove(fTo, fInputBuffer, fMaxSize);
46 fFrameSize = fMaxSize;
47 fCurDataOffset += fMaxSize - 1;
48 fLastFragmentCompletedNALUnit = False;
49 }
50 } else { // case 3
51 // We are sending this NAL unit data as FU packets. We've already sent the
52 // first packet (fragment). Now, send the next fragment. Note that we add
53 // "NAL header" and "FU header" bytes to the front. (We reuse these bytes that
54 // we already sent for the first fragment, but clear the S bit, and add the E
55 // bit if this is the last fragment.)
56 unsigned numExtraHeaderBytes;
57 if (fHNumber == 264) {
58 fInputBuffer[fCurDataOffset-2] = fInputBuffer[0]; // FU indicator
59 fInputBuffer[fCurDataOffset-1] = fInputBuffer[1]&~0x80; // FU header (no S bit)
60 numExtraHeaderBytes = 2;
61 } else { // 265
62 fInputBuffer[fCurDataOffset-3] = fInputBuffer[0]; // Payload header (1st byte)
63 fInputBuffer[fCurDataOffset-2] = fInputBuffer[1]; // Payload header (2nd byte)
64 fInputBuffer[fCurDataOffset-1] = fInputBuffer[2]&~0x80; // FU header (no S bit)
65 numExtraHeaderBytes = 3;
66 }
67 unsigned numBytesToSend = numExtraHeaderBytes + (fNumValidDataBytes - fCurDataOffset);
68 if (numBytesToSend > fMaxSize) {
69 // We can't send all of the remaining data this time:
70 numBytesToSend = fMaxSize;
71 fLastFragmentCompletedNALUnit = False;
72 } else {
73 // This is the last fragment:
74 fInputBuffer[fCurDataOffset-1] |= 0x40; // set the E bit in the FU header
75 fNumTruncatedBytes = fSaveNumTruncatedBytes;
76 }
77 memmove(fTo, &fInputBuffer[fCurDataOffset-numExtraHeaderBytes], numBytesToSend);
78 fFrameSize = numBytesToSend;
79 fCurDataOffset += numBytesToSend - numExtraHeaderBytes;
80 }
81
82 if (fCurDataOffset >= fNumValidDataBytes) {
83 // We're done with this data. Reset the pointers for receiving new data:
84 fNumValidDataBytes = fCurDataOffset = 1;
85 }
86
87 // Complete delivery to the client:
88 FramedSource::afterGetting(this); // 回调MultiFramedRTPSink::afterGettingFrame函数
89 }
90 }
1 void ByteStreamFileSource::doGetNextFrame() {
2 if (feof(fFid) || ferror(fFid) || (fLimitNumBytesToStream && fNumBytesToStream == 0)) {
3 handleClosure();
4 return;
5 }
6
7 #ifdef READ_FROM_FILES_SYNCHRONOUSLY
8 doReadFromFile(); // 读文件
9 #else
10 if (!fHaveStartedReading) {
11 // Await readable data from the file:
12 envir().taskScheduler().turnOnBackgroundReadHandling(fileno(fFid),
13 (TaskScheduler::BackgroundHandlerProc*)&fileReadableHandler, this);
14 fHaveStartedReading = True;
15 }
16 #endif
17 }
18
19 void ByteStreamFileSource::doReadFromFile() {
20 // Try to read as many bytes as will fit in the buffer provided (or "fPreferredFrameSize" if less)
21 if (fLimitNumBytesToStream && fNumBytesToStream < (u_int64_t)fMaxSize) {
22 fMaxSize = (unsigned)fNumBytesToStream;
23 }
24 if (fPreferredFrameSize > 0 && fPreferredFrameSize < fMaxSize) {
25 fMaxSize = fPreferredFrameSize;
26 }
27 #ifdef READ_FROM_FILES_SYNCHRONOUSLY
28 fFrameSize = fread(fTo, 1, fMaxSize, fFid); //调用fread函数读取数据
29 #else
30 if (fFidIsSeekable) {
31 fFrameSize = fread(fTo, 1, fMaxSize, fFid);
32 } else {
33 // For non-seekable files (e.g., pipes), call "read()" rather than "fread()", to ensure that the read doesn't block:
34 fFrameSize = read(fileno(fFid), fTo, fMaxSize);
35 }
36 #endif
37 if (fFrameSize == 0) {
38 handleClosure();
39 return;
40 }
41 fNumBytesToStream -= fFrameSize;
42
43 // Set the 'presentation time':
44 if (fPlayTimePerFrame > 0 && fPreferredFrameSize > 0) {
45 if (fPresentationTime.tv_sec == 0 && fPresentationTime.tv_usec == 0) {
46 // This is the first frame, so use the current time:
47 gettimeofday(&fPresentationTime, NULL);
48 } else {
49 // Increment by the play time of the previous data:
50 unsigned uSeconds = fPresentationTime.tv_usec + fLastPlayTime;
51 fPresentationTime.tv_sec += uSeconds/1000000;
52 fPresentationTime.tv_usec = uSeconds%1000000;
53 }
54
55 // Remember the play time of this data:
56 fLastPlayTime = (fPlayTimePerFrame*fFrameSize)/fPreferredFrameSize;
57 fDurationInMicroseconds = fLastPlayTime;
58 } else {
59 // We don't know a specific play time duration for this data,
60 // so just record the current time as being the 'presentation time':
61 gettimeofday(&fPresentationTime, NULL);
62 }
63 //读取数据后,调用FramedSource::afterGetting函数
64 // Inform the reader that he has data:
65 #ifdef READ_FROM_FILES_SYNCHRONOUSLY
66 // To avoid possible infinite recursion, we need to return to the event loop to do this:
67 nextTask() = envir().taskScheduler().scheduleDelayedTask(0,
68 (TaskFunc*)FramedSource::afterGetting, this);
69 #else
70 // Because the file read was done from the event loop, we can call the
71 // 'after getting' function directly, without risk of infinite recursion:
72 FramedSource::afterGetting(this);
73 #endif
74 }
75
76 void FramedSource::afterGetting(FramedSource* source) {
77 source->fIsCurrentlyAwaitingData = False;
78 // indicates that we can be read again
79 // Note that this needs to be done here, in case the "fAfterFunc"
80 // called below tries to read another frame (which it usually will)
81
82 if (source->fAfterGettingFunc != NULL) {
83 (*(source->fAfterGettingFunc))(source->fAfterGettingClientData,
84 source->fFrameSize, source->fNumTruncatedBytes,
85 source->fPresentationTime,
86 source->fDurationInMicroseconds);
87 }
88 }
在FramedSource::afterGetting函数中调用fAfterGettingFunc函数,对于ByteStreamFileSource对象,fAfterGettingFunc在之前被设置为H264or5Fragmenter::afterGettingFrame函数:
1 void H264or5Fragmenter::afterGettingFrame(void* clientData, unsigned frameSize,
2 unsigned numTruncatedBytes,
3 struct timeval presentationTime,
4 unsigned durationInMicroseconds) {
5 H264or5Fragmenter* fragmenter = (H264or5Fragmenter*)clientData;
6 fragmenter->afterGettingFrame1(frameSize, numTruncatedBytes, presentationTime,
7 durationInMicroseconds);
8 }
9
10 void H264or5Fragmenter::afterGettingFrame1(unsigned frameSize,
11 unsigned numTruncatedBytes,
12 struct timeval presentationTime,
13 unsigned durationInMicroseconds) {
14 fNumValidDataBytes += frameSize;
15 fSaveNumTruncatedBytes = numTruncatedBytes;
16 fPresentationTime = presentationTime;
17 fDurationInMicroseconds = durationInMicroseconds;
18
19 // Deliver data to the client:
20 doGetNextFrame();
21 }
在H264or5Fragmenter::afterGettingFrame1函数中又调用了H264or5Fragmenter::doGetNextFrame函数,在H264or5Fragmenter::doGetNextFrame函数中,当读取的帧数据满足条件时就又回调MultiFrameRTPSink的afterGettingFrame函数。
1 void MultiFramedRTPSink
2 ::afterGettingFrame(void* clientData, unsigned numBytesRead,
3 unsigned numTruncatedBytes,
4 struct timeval presentationTime,
5 unsigned durationInMicroseconds) {
6 MultiFramedRTPSink* sink = (MultiFramedRTPSink*)clientData;
7 sink->afterGettingFrame1(numBytesRead, numTruncatedBytes,
8 presentationTime, durationInMicroseconds);
9 }
10
11 void MultiFramedRTPSink
12 ::afterGettingFrame1(unsigned frameSize, unsigned numTruncatedBytes,
13 struct timeval presentationTime,
14 unsigned durationInMicroseconds) {
15 if (fIsFirstPacket) {
16 // Record the fact that we're starting to play now:
17 gettimeofday(&fNextSendTime, NULL);
18 }
19
20 fMostRecentPresentationTime = presentationTime;
21 if (fInitialPresentationTime.tv_sec == 0 && fInitialPresentationTime.tv_usec == 0) {
22 fInitialPresentationTime = presentationTime;
23 }
24
25 if (numTruncatedBytes > 0) { //超出缓冲区大小要被舍弃的数据
26 unsigned const bufferSize = fOutBuf->totalBytesAvailable();
27 envir() << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("
28 << bufferSize << "). "
29 << numTruncatedBytes << " bytes of trailing data was dropped! Correct this by increasing \"OutPacketBuffer::maxSize\" to at least "
30 << OutPacketBuffer::maxSize + numTruncatedBytes << ", *before* creating this 'RTPSink'. (Current value is "
31 << OutPacketBuffer::maxSize << ".)\n";
32 }
33 unsigned curFragmentationOffset = fCurFragmentationOffset;
34 unsigned numFrameBytesToUse = frameSize;
35 unsigned overflowBytes = 0;
36
37 // If we have already packed one or more frames into this packet,
38 // check whether this new frame is eligible to be packed after them.
39 // (This is independent of whether the packet has enough room for this
40 // new frame; that check comes later.)
41 if (fNumFramesUsedSoFar > 0) { // 在这个RTP包中已经包含了若干帧数据
42 if ((fPreviousFrameEndedFragmentation
43 && !allowOtherFramesAfterLastFragment())
44 || !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize)) { //这个RTP包中不允许再添加多余的帧(比如:前面的帧作了结尾标记)
45 // Save away this frame for next time:
46 numFrameBytesToUse = 0;
47 fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize,
48 presentationTime, durationInMicroseconds);
49 }
50 }
51 fPreviousFrameEndedFragmentation = False;
52
53 if (numFrameBytesToUse > 0) { // 允许将这一帧添加到RTP包中,但要检查大小是否超出了RTP包的剩余空间
54 // Check whether this frame overflows the packet
55 if (fOutBuf->wouldOverflow(frameSize)) { //这一帧数据超出了RTP包剩余空间
56 // Don't use this frame now; instead, save it as overflow data, and
57 // send it in the next packet instead. However, if the frame is too
58 // big to fit in a packet by itself, then we need to fragment it (and
59 // use some of it in this packet, if the payload format permits this.)
60 if (isTooBigForAPacket(frameSize)
61 && (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) { //发送这一帧数据的一部分
62 // We need to fragment this frame, and use some of it now:
63 overflowBytes = computeOverflowForNewFrame(frameSize);
64 numFrameBytesToUse -= overflowBytes;
65 fCurFragmentationOffset += numFrameBytesToUse;
66 } else {
67 // We don't use any of this frame now: // 不添加这一帧数据
68 overflowBytes = frameSize;
69 numFrameBytesToUse = 0;
70 }
71 fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse, // 标记超出帧的位置和大小,以便后面调整packet
72 overflowBytes, presentationTime, durationInMicroseconds);
73 } else if (fCurFragmentationOffset > 0) {
74 // This is the last fragment of a frame that was fragmented over
75 // more than one packet. Do any special handling for this case:
76 fCurFragmentationOffset = 0;
77 fPreviousFrameEndedFragmentation = True;
78 }
79 }
80
81 if (numFrameBytesToUse == 0 && frameSize > 0) { // 读取适当的数据后,开始发送RTP包
82 // Send our packet now, because we have filled it up:
83 sendPacketIfNecessary();
84 } else {
85 // Use this frame in our outgoing packet:
86 unsigned char* frameStart = fOutBuf->curPtr();
87 fOutBuf->increment(numFrameBytesToUse);
88 // do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes
89
90 // Here's where any payload format specific processing gets done:
91 doSpecialFrameHandling(curFragmentationOffset, frameStart,
92 numFrameBytesToUse, presentationTime,
93 overflowBytes);
94
95 ++fNumFramesUsedSoFar;
96
97 // Update the time at which the next packet should be sent, based
98 // on the duration of the frame that we just packed into it.
99 // However, if this frame has overflow data remaining, then don't
100 // count its duration yet.
101 if (overflowBytes == 0) {
102 fNextSendTime.tv_usec += durationInMicroseconds;
103 fNextSendTime.tv_sec += fNextSendTime.tv_usec/1000000;
104 fNextSendTime.tv_usec %= 1000000;
105 }
106
107 // Send our packet now if (i) it's already at our preferred size, or
108 // (ii) (heuristic) another frame of the same size as the one we just
109 // read would overflow the packet, or
110 // (iii) it contains the last fragment of a fragmented frame, and we
111 // don't allow anything else to follow this or
112 // (iv) one frame per packet is allowed:
113 if (fOutBuf->isPreferredSize()
114 || fOutBuf->wouldOverflow(numFrameBytesToUse)
115 || (fPreviousFrameEndedFragmentation &&
116 !allowOtherFramesAfterLastFragment())
117 || !frameCanAppearAfterPacketStart(fOutBuf->curPtr() - frameSize,
118 frameSize) ) {
119 // The packet is ready to be sent now
120 sendPacketIfNecessary();
121 } else {
122 // There's room for more frames; try getting another:
123 packFrame(); // 调用packFrame函数读取更多的数据
124 }
125 }
126 }
在MultiFramedRTPSink中尽量读取多的帧之后,调用sendPacketIfNecessary函数发送给客户端:
1 void MultiFramedRTPSink::sendPacketIfNecessary() {
2 if (fNumFramesUsedSoFar > 0) {
3 // Send the packet:
4 #ifdef TEST_LOSS
5 if ((our_random()%10) != 0) // simulate 10% packet loss #####
6 #endif
7 if (!fRTPInterface.sendPacket(fOutBuf->packet(), fOutBuf->curPacketSize())) { // 通过RTPInterface发送RTP包
8 // if failure handler has been specified, call it
9 if (fOnSendErrorFunc != NULL) (*fOnSendErrorFunc)(fOnSendErrorData);
10 }
11 ++fPacketCount;
12 fTotalOctetCount += fOutBuf->curPacketSize();
13 fOctetCount += fOutBuf->curPacketSize()
14 - rtpHeaderSize - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes;
15
16 ++fSeqNo; // for next time
17 }
18
19 if (fOutBuf->haveOverflowData() //未发送的帧数据,对RTP包作出调整
20 && fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize()/2) {
21 // Efficiency hack: Reset the packet start pointer to just in front of
22 // the overflow data (allowing for the RTP header and special headers),
23 // so that we probably don't have to "memmove()" the overflow data
24 // into place when building the next packet:
25 unsigned newPacketStart = fOutBuf->curPacketSize()
26 - (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize());
27 fOutBuf->adjustPacketStart(newPacketStart);
28 } else {
29 // Normal case: Reset the packet start pointer back to the start:
30 fOutBuf->resetPacketStart();
31 }
32 fOutBuf->resetOffset();
33 fNumFramesUsedSoFar = 0;
34
35 if (fNoFramesLeft) {
36 // We're done:
37 onSourceClosure();
38 } else {
39 // We have more frames left to send. Figure out when the next frame
40 // is due to start playing, then make sure that we wait this long before
41 // sending the next packet.
42 struct timeval timeNow;
43 gettimeofday(&timeNow, NULL);
44 int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec;
45 int64_t uSecondsToGo = secsDiff*1000000 + (fNextSendTime.tv_usec - timeNow.tv_usec);
46 if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative:
47 uSecondsToGo = 0;
48 }
49
50 // Delay this amount of time: // 准备下一次发送RTP包
51 nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo, (TaskFunc*)sendNext, this);
52 }
53 }
54
55 // The following is called after each delay between packet sends:
56 void MultiFramedRTPSink::sendNext(void* firstArg) {
57 MultiFramedRTPSink* sink = (MultiFramedRTPSink*)firstArg;
58 sink->buildAndSendPacket(False); //循环调用buildAndSendPacket
59 }
以上的调用函数过程比较乱,特附一张图以更清晰地展示以上的流程
服务器端通过RTPSink去读数据,在RTPSink中又通过FramedSource读数据,读完数据后交给RTPSink处理,RTPSink处理完后继续通过FramedSource读取数据,如此在RTPSink和FramedSoruce之间形成一个循环,这是Live555读取发送数据的总体流程。
以上便是从建立RTSP连接到发送RTP数据的流程(以H264文件为例),后面的停止发送数据到断开连接不再关注和详述。
作者:昨夜星辰