现在的位置: 首页 > 自动控制 > 工业·编程 > 正文

Live555学习之(四):建立RTSP连接的过程(RTSP客户端)

2019-08-25 14:10 工业·编程 ⁄ 共 23469字 ⁄ 字号 暂无评论

Live555不仅实现了RTSP服务器端,还实现了RTSP客户端,我们通过testRTSPClient.cpp这个程序来看一下,Live555的RTSP客户端与服务器端建立RTSP连接的过程。

首先来看一下main函数:

1 char eventLoopWatchVariable = 0;

2

3 int main(int argc, char** argv) {

4   // Begin by setting up our usage environment:

5   TaskScheduler* scheduler = BasicTaskScheduler::createNew();

6   UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler);

7

8   // We need at least one "rtsp://" URL argument:

9   if (argc < 2) {

10     usage(*env, argv[0]);

11     return 1;

12   }

13

14   // There are argc-1 URLs: argv[1] through argv[argc-1].  Open and start streaming each one:

15   for (int i = 1; i <= argc-1; ++i) {

16     openURL(*env, argv[0], argv[i]);

17   }

18

19   // All subsequent activity takes place within the event loop:

20   env->taskScheduler().doEventLoop(&eventLoopWatchVariable);

21     // This function call does not return, unless, at some point in time, "eventLoopWatchVariable" gets set to something non-zero.

22

23   return 0;

24

25   // If you choose to continue the application past this point (i.e., if you comment out the "return 0;" statement above),

26   // and if you don't intend to do anything more with the "TaskScheduler" and "UsageEnvironment" objects,

27   // then you can also reclaim the (small) memory used by these objects by uncommenting the following code:

28   /*

29     env->reclaim(); env = NULL;

30     delete scheduler; scheduler = NULL;

31   */

32 }

和testOnDeamandRTSPServer.cpp一样,首先也是创建TaskScheduler对象和UsageEnvironment对象,然后调用openURL函数去请求某个媒体资源,参数是该媒体资源的RTSP地址,最后使程序进入主循环。

1 void openURL(UsageEnvironment& env, char const* progName, char const* rtspURL) {

2   // Begin by creating a "RTSPClient" object.  Note that there is a separate "RTSPClient" object for each stream that we wish

3   // to receive (even if more than stream uses the same "rtsp://" URL).

4   RTSPClient* rtspClient = ourRTSPClient::createNew(env, rtspURL, RTSP_CLIENT_VERBOSITY_LEVEL, progName);

5   if (rtspClient == NULL) {

6     env << "Failed to create a RTSP client for URL \"" << rtspURL << "\": " << env.getResultMsg() << "\n";

7     return;

8   }

9

10   ++rtspClientCount;

11

12   // Next, send a RTSP "DESCRIBE" command, to get a SDP description for the stream.

13   // Note that this command - like all RTSP commands - is sent asynchronously; we do not block, waiting for a response.

14   // Instead, the following function call returns immediately, and we handle the RTSP response later, from within the event loop:

15   rtspClient->sendDescribeCommand(continueAfterDESCRIBE); //发送DESCRIBE命令,并传入回调函数

16 }

OpenURL函数很简单,创建一个RTSPClient对象,一个RTSPClient对象代表一个RTSP客户端。然后调用sendDescribeCommand函数发送DESCRIBE命令,回调函数是continueAfterDESCRIBE函数,在收到RTSP服务器端对DESCRIBE命令的回复时调用。

1 void continueAfterDESCRIBE(RTSPClient* rtspClient, int resultCode, char* resultString) {

2   do {

3     UsageEnvironment& env = rtspClient->envir(); // alias

4     StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias

5

6     if (resultCode != 0) {  // 返回结果码非0表示出错

7       env << *rtspClient << "Failed to get a SDP description: " << resultString << "\n";

8       delete[] resultString;

9       break;

10     }

11     // resultString即从服务器端返回的SDP信息字符串

12     char* const sdpDescription = resultString;

13     env << *rtspClient << "Got a SDP description:\n" << sdpDescription << "\n";

14

15     // Create a media session object from this SDP description:

16     scs.session = MediaSession::createNew(env, sdpDescription);   //根据SDP信息创建一个MediaSession对象

17     delete[] sdpDescription; // because we don't need it anymore

18     if (scs.session == NULL) {

19       env << *rtspClient << "Failed to create a MediaSession object from the SDP description: " << env.getResultMsg() << "\n";

20       break;

21     } else if (!scs.session->hasSubsessions()) {

22       env << *rtspClient << "This session has no media subsessions (i.e., no \"m=\" lines)\n";

23       break;

24     }

25

26     // Then, create and set up our data source objects for the session.  We do this by iterating over the session's 'subsessions',

27     // calling "MediaSubsession::initiate()", and then sending a RTSP "SETUP" command, on each one.

28     // (Each 'subsession' will have its own data source.)

29     scs.iter = new MediaSubsessionIterator(*scs.session);

30     setupNextSubsession(rtspClient); //开始对服务器端的每个ServerMediaSubsession发送SETUP命令请求建立连接

31     return;

32   } while (0);

33

34   // An unrecoverable error occurred with this stream.

35   shutdownStream(rtspClient);

36 }

客户端收到服务器端对DESCRIBE命令的回复,取得SDP信息后,客户端创建一个MediaSession对象。MediaSession和ServerMediaSession是相对应的概念,MediaSession表示客户端请求服务器端某个媒体资源的会话,类似地,客户端还存在与ServerMediaSubsession相对应的MediaSubsession,表示MediaSession的子会话,创建MediaSession的同时也创建了包含的MediaSubsession对象。然后客户端对服务器端的每个ServerMediaSubsession发送SETUP命令请求建立连接。

1 void setupNextSubsession(RTSPClient* rtspClient) {

2   UsageEnvironment& env = rtspClient->envir(); // alias

3   StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias

4  

5   scs.subsession = scs.iter->next();

6   if (scs.subsession != NULL) {

7     if (!scs.subsession->initiate()) {  // 调用initiate函数初始化MediaSubsession对象

8       env << *rtspClient << "Failed to initiate the \"" << *scs.subsession << "\" subsession: " << env.getResultMsg() << "\n";

9       setupNextSubsession(rtspClient); // give up on this subsession; go to the next one

10     } else {

11       env << *rtspClient << "Initiated the \"" << *scs.subsession << "\" subsession (";

12       if (scs.subsession->rtcpIsMuxed()) {

13     env << "client port " << scs.subsession->clientPortNum();

14       } else {

15     env << "client ports " << scs.subsession->clientPortNum() << "-" << scs.subsession->clientPortNum()+1;

16       }

17       env << ")\n";

18       // 发送SETUP命令

19       // Continue setting up this subsession, by sending a RTSP "SETUP" command:

20       rtspClient->sendSetupCommand(*scs.subsession, continueAfterSETUP, False, REQUEST_STREAMING_OVER_TCP);

21     }

22     return;

23   }

24   // 成功与所有的ServerMediaSubsession建立了连接,现在发送PLAY命令

25   // We've finished setting up all of the subsessions.  Now, send a RTSP "PLAY" command to start the streaming:

26   if (scs.session->absStartTime() != NULL) {

27     // Special case: The stream is indexed by 'absolute' time, so send an appropriate "PLAY" command:

28     rtspClient->sendPlayCommand(*scs.session, continueAfterPLAY, scs.session->absStartTime(), scs.session->absEndTime());

29   } else {

30     scs.duration = scs.session->playEndTime() - scs.session->playStartTime();

31     rtspClient->sendPlayCommand(*scs.session, continueAfterPLAY);

32   }

33 }

34

35 void continueAfterSETUP(RTSPClient* rtspClient, int resultCode, char* resultString) {

36   do {

37     UsageEnvironment& env = rtspClient->envir(); // alias

38     StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias

39

40     if (resultCode != 0) {

41       env << *rtspClient << "Failed to set up the \"" << *scs.subsession << "\" subsession: " << resultString << "\n";

42       break;

43     }

44

45     env << *rtspClient << "Set up the \"" << *scs.subsession << "\" subsession (";

46     if (scs.subsession->rtcpIsMuxed()) {

47       env << "client port " << scs.subsession->clientPortNum();

48     } else {

49       env << "client ports " << scs.subsession->clientPortNum() << "-" << scs.subsession->clientPortNum()+1;

50     }

51     env << ")\n";

52

53     // Having successfully setup the subsession, create a data sink for it, and call "startPlaying()" on it.

54     // (This will prepare the data sink to receive data; the actual flow of data from the client won't start happening until later,

55     // after we've sent a RTSP "PLAY" command.)

56     //对每个MediaSubsession创建一个MediaSink对象来请求和保存数据

57     scs.subsession->sink = DummySink::createNew(env, *scs.subsession, rtspClient->url());

58       // perhaps use your own custom "MediaSink" subclass instead

59     if (scs.subsession->sink == NULL) {

60       env << *rtspClient << "Failed to create a data sink for the \"" << *scs.subsession

61       << "\" subsession: " << env.getResultMsg() << "\n";

62       break;

63     }

64

65     env << *rtspClient << "Created a data sink for the \"" << *scs.subsession << "\" subsession\n";

66     scs.subsession->miscPtr = rtspClient; // a hack to let subsession handle functions get the "RTSPClient" from the subsession

67     scs.subsession->sink->startPlaying(*(scs.subsession->readSource()),

68                        subsessionAfterPlaying, scs.subsession);          // 调用MediaSink的startPlaying函数准备播放

69     // Also set a handler to be called if a RTCP "BYE" arrives for this subsession:

70     if (scs.subsession->rtcpInstance() != NULL) {

71       scs.subsession->rtcpInstance()->setByeHandler(subsessionByeHandler, scs.subsession);

72     }

73   } while (0);

74   delete[] resultString;

75

76   // Set up the next subsession, if any:  与下一个ServerMediaSubsession建立连接

77   setupNextSubsession(rtspClient);

78 }

setupNextSubsession函数中首先调用MediaSubsession的initiate函数初始化MediaSubsession,然后对ServerMediaSubsession发送SETUP命令,收到回复后回调continueAfterSETUP函数。在continueAfterSETUP函数中,为MediaSubsession创建MediaSink对象来请求和保存服务器端发送的数据,然后调用MediaSink::startPlaying函数开始准备播放对应的ServerMediaSubsession,最后调用setupNextSubsession函数与下一个ServerMediaSubsession建立连接,在setupNextSubsession函数中,会检查是否与所有的ServerMediaSubsession都建立了连接,是则发送PLAY命令请求开始传送数据,收到回复则调用continueAfterPLAY函数。

在客户端发送PLAY命令之前,我们先看一下MediaSubsession::initiate函数的内容:

  1 Boolean MediaSubsession::initiate(int useSpecialRTPoffset) {

  2   if (fReadSource != NULL) return True; // has already been initiated

  3

  4   do {

  5     if (fCodecName == NULL) {

  6       env().setResultMsg("Codec is unspecified");

  7       break;

  8     }

  9     //创建客户端socket,包括RTP socket和RTCP socket,准备从服务器端接收数据

10     // Create RTP and RTCP 'Groupsocks' on which to receive incoming data.

11     // (Groupsocks will work even for unicast addresses)

12     struct in_addr tempAddr;

13     tempAddr.s_addr = connectionEndpointAddress();

14         // This could get changed later, as a result of a RTSP "SETUP"

15     //使用指定的RTP端口和RTCP端口,RTP端口必须是偶数,而RTCP端口必须是(RTP端口+1)

16     if (fClientPortNum != 0 && (honorSDPPortChoice || IsMulticastAddress(tempAddr.s_addr))) {

17       // The sockets' port numbers were specified for us.  Use these:

18       Boolean const protocolIsRTP = strcmp(fProtocolName, "RTP") == 0;

19       if (protocolIsRTP && !fMultiplexRTCPWithRTP) {

20     fClientPortNum = fClientPortNum&~1;

21         // use an even-numbered port for RTP, and the next (odd-numbered) port for RTCP

22       }

23       if (isSSM()) {

24     fRTPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, fClientPortNum);

25       } else {

26     fRTPSocket = new Groupsock(env(), tempAddr, fClientPortNum, 255);

27       }

28       if (fRTPSocket == NULL) {

29     env().setResultMsg("Failed to create RTP socket");

30     break;

31       }

32      

33       if (protocolIsRTP) {

34     if (fMultiplexRTCPWithRTP) {

35       // Use the RTP 'groupsock' object for RTCP as well:

36       fRTCPSocket = fRTPSocket;

37     } else {

38       // Set our RTCP port to be the RTP port + 1:

39       portNumBits const rtcpPortNum = fClientPortNum|1;

40       if (isSSM()) {

41         fRTCPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, rtcpPortNum);

42       } else {

43         fRTCPSocket = new Groupsock(env(), tempAddr, rtcpPortNum, 255);

44       }

45     }

46       }

47     } else {

          // 选取随机的RTP端口和RTCP端口

48       // Port numbers were not specified in advance, so we use ephemeral port numbers.

49       // Create sockets until we get a port-number pair (even: RTP; even+1: RTCP).

50       // (However, if we're multiplexing RTCP with RTP, then we create only one socket,

51       // and the port number  can be even or odd.)

52       // We need to make sure that we don't keep trying to use the same bad port numbers over

53       // and over again, so we store bad sockets in a table, and delete them all when we're done.

54       HashTable* socketHashTable = HashTable::create(ONE_WORD_HASH_KEYS);

55       if (socketHashTable == NULL) break;

56       Boolean success = False;

57       NoReuse dummy(env());

58           // ensures that our new ephemeral port number won't be one that's already in use

59

60       while (1) {

61     // Create a new socket:

62     if (isSSM()) {

63       fRTPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, 0);

64     } else {

65       fRTPSocket = new Groupsock(env(), tempAddr, 0, 255);

66     }

67     if (fRTPSocket == NULL) {

68       env().setResultMsg("MediaSession::initiate(): unable to create RTP and RTCP sockets");

69       break;

70     }

71

72     // Get the client port number:

73     Port clientPort(0);

74     if (!getSourcePort(env(), fRTPSocket->socketNum(), clientPort)) {

75       break;

76     }

77     fClientPortNum = ntohs(clientPort.num());

78

79     if (fMultiplexRTCPWithRTP) {

80       // Use this RTP 'groupsock' object for RTCP as well:

81       fRTCPSocket = fRTPSocket;

82       success = True;

83       break;

84     }     

85

86     // To be usable for RTP, the client port number must be even:

87     if ((fClientPortNum&1) != 0) { // it's odd

88       // Record this socket in our table, and keep trying:

89       unsigned key = (unsigned)fClientPortNum;

90       Groupsock* existing = (Groupsock*)socketHashTable->Add((char const*)key, fRTPSocket);

91       delete existing; // in case it wasn't NULL

92       continue;

93     }

94

95     // Make sure we can use the next (i.e., odd) port number, for RTCP:

96     portNumBits rtcpPortNum = fClientPortNum|1;

97     if (isSSM()) {

98       fRTCPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, rtcpPortNum);

99     } else {

100       fRTCPSocket = new Groupsock(env(), tempAddr, rtcpPortNum, 255);

101     }

102     if (fRTCPSocket != NULL && fRTCPSocket->socketNum() >= 0) {

103       // Success! Use these two sockets.

104       success = True;

105       break;

106     } else {

107       // We couldn't create the RTCP socket (perhaps that port number's already in use elsewhere?).

108       delete fRTCPSocket; fRTCPSocket = NULL;

109

110       // Record the first socket in our table, and keep trying:

111       unsigned key = (unsigned)fClientPortNum;

112       Groupsock* existing = (Groupsock*)socketHashTable->Add((char const*)key, fRTPSocket);

113       delete existing; // in case it wasn't NULL

114       continue;

115     }

116       }

117

118       // Clean up the socket hash table (and contents):

119       Groupsock* oldGS;

120       while ((oldGS = (Groupsock*)socketHashTable->RemoveNext()) != NULL) {

121     delete oldGS;

122       }

123       delete socketHashTable;

124

125       if (!success) break; // a fatal error occurred trying to create the RTP and RTCP sockets; we can't continue

126     }

127

128     // Try to use a big receive buffer for RTP - at least 0.1 second of

129     // specified bandwidth and at least 50 KB

130     unsigned rtpBufSize = fBandwidth * 25 / 2; // 1 kbps * 0.1 s = 12.5 bytes

131     if (rtpBufSize < 50 * 1024)

132       rtpBufSize = 50 * 1024;

133     increaseReceiveBufferTo(env(), fRTPSocket->socketNum(), rtpBufSize);

134

135     if (isSSM() && fRTCPSocket != NULL) {

136       // Special case for RTCP SSM: Send RTCP packets back to the source via unicast:

137       fRTCPSocket->changeDestinationParameters(fSourceFilterAddr,0,~0);

138     }

139     //创建FramedSource对象来请求数据

140     // Create "fRTPSource" and "fReadSource":

141     if (!createSourceObjects(useSpecialRTPoffset)) break;

142

143     if (fReadSource == NULL) {

144       env().setResultMsg("Failed to create read source");

145       break;

146     }

147     // 创建RTCPInstance对象

148     // Finally, create our RTCP instance. (It starts running automatically)

149     if (fRTPSource != NULL && fRTCPSocket != NULL) {

150       // If bandwidth is specified, use it and add 5% for RTCP overhead.

151       // Otherwise make a guess at 500 kbps.

152       unsigned totSessionBandwidth

153     = fBandwidth ? fBandwidth + fBandwidth / 20 : 500;

154       fRTCPInstance = RTCPInstance::createNew(env(), fRTCPSocket,

155                           totSessionBandwidth,

156                           (unsigned char const*)

157                           fParent.CNAME(),

158                           NULL /* we're a client */,

159                           fRTPSource);

160       if (fRTCPInstance == NULL) {

161     env().setResultMsg("Failed to create RTCP instance");

162     break;

163       }

164     }

165

166     return True;

167   } while (0);

168

169   deInitiate();

170   fClientPortNum = 0;

171   return False;

172 }

在MediaSubsession::initiate函数中,首先创建了两个客户端socket分别用于接收RTP数据和RTCP数据;然后创建FramedSource对象用来从服务器端请求数据,FramedSource对象在createSourceObjects函数中被创建,createSourceObjects根据ServerMediaSubsession资源的不同格式创建不同的FramedSource,我们还是以H264视频为例,则创建的是H264VideoRTPSource对象;最后还创建了RTCPInstance对象。

接下来,我们继续看客户端收到PLAY命令回复后调用continueAfterPLAY函数:

1 void continueAfterPLAY(RTSPClient* rtspClient, int resultCode, char* resultString) {

2   Boolean success = False;

3

4   do {

5     UsageEnvironment& env = rtspClient->envir(); // alias

6     StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias

7

8     if (resultCode != 0) {

9       env << *rtspClient << "Failed to start playing session: " << resultString << "\n";

10       break;

11     }

12

13     // Set a timer to be handled at the end of the stream's expected duration (if the stream does not already signal its end

14     // using a RTCP "BYE").  This is optional.  If, instead, you want to keep the stream active - e.g., so you can later

15     // 'seek' back within it and do another RTSP "PLAY" - then you can omit this code.

16     // (Alternatively, if you don't want to receive the entire stream, you could set this timer for some shorter value.)

17     if (scs.duration > 0) {

18       unsigned const delaySlop = 2; // number of seconds extra to delay, after the stream's expected duration.  (This is optional.)

19       scs.duration += delaySlop;

20       unsigned uSecsToDelay = (unsigned)(scs.duration*1000000);

21       scs.streamTimerTask = env.taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)streamTimerHandler, rtspClient);

22     }

23

24     env << *rtspClient << "Started playing session";

25     if (scs.duration > 0) {

26       env << " (for up to " << scs.duration << " seconds)";

27     }

28     env << "...\n";

29

30     success = True;

31   } while (0);

32   delete[] resultString;

33

34   if (!success) {

35     // An unrecoverable error occurred with this stream.

36     shutdownStream(rtspClient);

37   }

38 }

continueAfterPLAY函数的内容很简单,只是简单地打印出“Started  playing  session”。在服务器端收到PLAY命令后,就开始向客户端发送RTP数据包和RTCP数据包,而客户端在MediaSink::startPlaying函数中就开始等待接收来自服务器端的视频数据。

在continueAfterSETUP函数中创建的MediaSink是DummySink对象,DummySink是MediaSink的子类,这个例子中客户端没有利用收到的视频数据,所以叫做DummySink。

客户端调用MediaSink::startPlaying函数开始接收服务器端的数据,这个函数和之前介绍服务器端建立RTSP连接过程时是同一个函数

1 Boolean MediaSink::startPlaying(MediaSource& source,

2                 afterPlayingFunc* afterFunc,

3                 void* afterClientData) {

4   // Make sure we're not already being played:

5   if (fSource != NULL) {

6     envir().setResultMsg("This sink is already being played");

7     return False;

8   }

9

10   // Make sure our source is compatible:

11   if (!sourceIsCompatibleWithUs(source)) {

12     envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!");

13     return False;

14   }

15   fSource = (FramedSource*)&source;       //此处的fSource是之前创立的H264VideoRTPSource对象

16

17   fAfterFunc = afterFunc;

18   fAfterClientData = afterClientData;

19   return continuePlaying();

20 }

在MediaSink::startPlaying函数中又调用DummySink::continuePlaying函数

1 Boolean DummySink::continuePlaying() {

2   if (fSource == NULL) return False; // sanity check (should not happen)

3

4   // Request the next frame of data from our input source.  "afterGettingFrame()" will get called later, when it arrives:

5   fSource->getNextFrame(fReceiveBuffer, DUMMY_SINK_RECEIVE_BUFFER_SIZE,

6                         afterGettingFrame, this,

7                         onSourceClosure, this);

8   return True;

9 }

在DummySink::continuePlaying函数中通过H264VideoRTPSource对象请求服务器端的数据,H264VideoRTPSource是MultiFramedRTPSource的子类,请求成功后回调DummySink::afterGettingFrame函数。在FramedSource::getNextFrame函数中,调用了MultiFramedRTPSource::doGetNextFrame函数:

1 void MultiFramedRTPSource::doGetNextFrame() {

2   if (!fAreDoingNetworkReads) {

3     // Turn on background read handling of incoming packets:

4     fAreDoingNetworkReads = True;

5     TaskScheduler::BackgroundHandlerProc* handler

6       = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler;

7     fRTPInterface.startNetworkReading(handler); //通过RTPInterface对象读取网络数据,在服务器端是通过RTPInterface对象发送网络数据

//读到数据后回调networkReadHandler函数来处理

8   }

9

10   fSavedTo = fTo; //读到的数据保存在fTo中

11   fSavedMaxSize = fMaxSize;

12   fFrameSize = 0; // for now

13   fNeedDelivery = True;

14   doGetNextFrame1();

15 }

16

17 void MultiFramedRTPSource::doGetNextFrame1() {

18   while (fNeedDelivery) { 19     // If we already have packet data available, then deliver it now.

20     Boolean packetLossPrecededThis;

21     BufferedPacket* nextPacket

22       = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);

23     if (nextPacket == NULL) break;

24

25     fNeedDelivery = False;

26

27     if (nextPacket->useCount() == 0) {

28       // Before using the packet, check whether it has a special header

29       // that needs to be processed:

30       unsigned specialHeaderSize;

31       if (!processSpecialHeader(nextPacket, specialHeaderSize)) {

32     // Something's wrong with the header; reject the packet:

33     fReorderingBuffer->releaseUsedPacket(nextPacket);

34     fNeedDelivery = True;

35     break;

36       }

37       nextPacket->skip(specialHeaderSize);

38     }

39

40     // Check whether we're part of a multi-packet frame, and whether

41     // there was packet loss that would render this packet unusable:

42     if (fCurrentPacketBeginsFrame) {

43       if (packetLossPrecededThis || fPacketLossInFragmentedFrame) {

44     // We didn't get all of the previous frame.

45     // Forget any data that we used from it:

46     fTo = fSavedTo; fMaxSize = fSavedMaxSize;

47     fFrameSize = 0;

48       }

49       fPacketLossInFragmentedFrame = False;

50     } else if (packetLossPrecededThis) {

51       // We're in a multi-packet frame, with preceding packet loss

52       fPacketLossInFragmentedFrame = True;

53     }

54     if (fPacketLossInFragmentedFrame) {

55       // This packet is unusable; reject it:

56       fReorderingBuffer->releaseUsedPacket(nextPacket);

57       fNeedDelivery = True;

58       break;

59     }

60

61     // The packet is usable. Deliver all or part of it to our caller:

62     unsigned frameSize;

63     nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,

64             fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,

65             fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,

66             fCurPacketMarkerBit);

67     fFrameSize += frameSize;

68

69     if (!nextPacket->hasUsableData()) {

70       // We're completely done with this packet now

71       fReorderingBuffer->releaseUsedPacket(nextPacket);

72     }

73

74     if (fCurrentPacketCompletesFrame) {         // 成功读到一帧数据

75       // We have all the data that the client wants.

76       if (fNumTruncatedBytes > 0) {

77     envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("

78         << fSavedMaxSize << ").  "

79         << fNumTruncatedBytes << " bytes of trailing data will be dropped!\n";

80       }

81       // Call our own 'after getting' function, so that the downstream object can consume the data:

82       if (fReorderingBuffer->isEmpty()) {

83     // Common case optimization: There are no more queued incoming packets, so this code will not get

84     // executed again without having first returned to the event loop.  Call our 'after getting' function

85     // directly, because there's no risk of a long chain of recursion (and thus stack overflow):

86     afterGetting(this);

87       } else {

88     // Special case: Call our 'after getting' function via the event loop.

89     nextTask() = envir().taskScheduler().scheduleDelayedTask(0,

90                                  (TaskFunc*)FramedSource::afterGetting, this);

91       }

92     } else {

93       // This packet contained fragmented data, and does not complete

94       // the data that the client wants.  Keep getting data:

95       fTo += frameSize; fMaxSize -= frameSize;

96       fNeedDelivery = True;

97     }

98   }

99 }

  在doGetNextFrame1函数中,若成功读取到一个完整的帧,则调用Framed::afterGetting函数,进一步回调了DummySink::afterGettingFrame函数

1 void DummySink::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes,

2                   struct timeval presentationTime, unsigned durationInMicroseconds) {

3   DummySink* sink = (DummySink*)clientData;

4   sink->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds);

5 }

6

7 // If you don't want to see debugging output for each received frame, then comment out the following line:

8 #define DEBUG_PRINT_EACH_RECEIVED_FRAME 1

9

10 void DummySink::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes,

11                   struct timeval presentationTime, unsigned /*durationInMicroseconds*/) {

12   // We've just received a frame of data.  (Optionally) print out information about it:

13 #ifdef DEBUG_PRINT_EACH_RECEIVED_FRAME

14   if (fStreamId != NULL) envir() << "Stream \"" << fStreamId << "\"; ";

15   envir() << fSubsession.mediumName() << "/" << fSubsession.codecName() << ":\tReceived " << frameSize << " bytes";

16   if (numTruncatedBytes > 0) envir() << " (with " << numTruncatedBytes << " bytes truncated)";

17   char uSecsStr[6+1]; // used to output the 'microseconds' part of the presentation time

18   sprintf(uSecsStr, "%06u", (unsigned)presentationTime.tv_usec);

19   envir() << ".\tPresentation time: " << (int)presentationTime.tv_sec << "." << uSecsStr;

20   if (fSubsession.rtpSource() != NULL && !fSubsession.rtpSource()->hasBeenSynchronizedUsingRTCP()) {

21     envir() << "!"; // mark the debugging output to indicate that this presentation time is not RTCP-synchronized

22   }

23 #ifdef DEBUG_PRINT_NPT

24   envir() << "\tNPT: " << fSubsession.getNormalPlayTime(presentationTime);

25 #endif

26   envir() << "\n";

27 #endif

28  

29   // Then continue, to request the next frame of data:

30   continuePlaying();

31 }

在DummySink::afterGettingFrame函数中只是简单地打印出了某个MediaSubsession接收到了多少字节的数据,然后接着利用FramedSource去读取数据。可以看出,在RTSP客户端,Live555也是在MediaSink和FramedSource之间形成了一个循环,不停地从服务器端读取数据。

作者:昨夜星辰

给我留言

留言无头像?