SRS-GB28181 实现浅析之二--基本逻辑与结构
在分析基本模块之前我们先说一下, SRS新版中已经把线程模块改为ST协程模型,
每个需要处理数据的地方就需要加一个协程来处理,他的好处是有着多线程的运行特点实则是单线,安全好用。内部对ST都封装了,你只需要实cycle()接口就可以使用。
GB模块中最主要的类有4个, 其它的功能类都是为这几个类服务的,我们来分析一下
1. SrsGbSession
此类一个完整SIP会话的最顶层管理类,内部包含了SIP信令连接类和MEDIA连接类两个变量指针。类只是根据状态进行具体的INVITE.类中保存了当前SESSION 的状态,状态只有三个如下:
enum SrsGbSessionState
{
//可切换到connecting
//在发送invite信令之前
SrsGbSessionStateInit = 0,
//可切换到established状态
// sip完成注册,media完成连接之前
SrsGbSessionStateConnecting,
//可切换到init: media 没有连接,进行reinvite
//可切换到session 销毁: when sip is bye.
//sip是稳定状态, media 完成连接
SrsGbSessionStateEstablished,
};
内部drive_state()是状态驱动过程,依据上面三个状态切换规则进行当前的状态向下一个状态的切换。
srs_error_t SrsGbSession::drive_state()
{
srs_error_t err = srs_success;
#define SRS_GB_CHANGE_STATE_TO(state) { \
SrsGbSessionState ostate = set_state(state); \
srs_trace("Session: Change device=%s, state=%s", sip_->device_id().c_str(), \
srs_gb_state(ostate, state_).c_str()); \
}
if (state_ == SrsGbSessionStateInit) {
if (sip_->is_registered()) {
SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateConnecting);
//记录media 开始边接时间,以计算超时
connecting_starttime_ = srs_update_system_time();
}
// 如果MEDIA 没有连接那么进行invite操作
if (sip_->is_registered() && !media_->is_connected()) {
uint32_t ssrc = 0;
if ((err = sip_->invite_request(&ssrc)) != srs_success) {
return srs_error_wrap(err, "invite");
}
//当PS流来时会根据ssrc来找到session进行处理, 所以把SESSION放入到快查询MAP中
_srs_gb_manager->add_with_fast_id(ssrc, wrapper_);
}
}
if (state_ == SrsGbSessionStateConnecting) {
//处理MEDIA连接的超时
if (srs_update_system_time() - connecting_starttime_ >= connecting_timeout_) {
//如果超时三次那么就返回错误,由上层线程循环根据返回值进行SESSION 销毁
if ((nn_timeout_++) > SRS_GB_MAX_TIMEOUT) {
return srs_error_new(ERROR_GB_TIMEOUT, "timeout");
}
srs_trace("Session: Connecting timeout, nn=%d, state=%s, sip=%s, media=%d", nn_timeout_, srs_gb_session_state(state_).c_str(),
srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected());
//超时了那重置SIP及session状态,那再次执行本函数体时就会重新发起invite请求
sip_->reset_to_register();
SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit);
}
//已经完成正常过程,那么就进入了稳定状态
if (sip_->is_stable() && media_->is_connected()) {
SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateEstablished);
}
}
if (state_ == SrsGbSessionStateEstablished) {
//稳定状态下,如果收到了来自设备的BYE指令,直接返回,不再进行处理,由于srs的实现不完善,所以此处是直接返回,依据GB标准,那么应该是断开media连接,恢复到SrsGbSessionStateInit,保持在注册在线状态。
if (sip_->is_bye()) {
srs_trace("Session: Dispose for client bye");
return err;
}
//稳定状态下断开了MEDIA连接,要得新设置SIP及session状态以进行重连,注意要等等一定的时间才能reinvite.
if (!media_->is_connected()) {
if (!reinviting_starttime_) {
//设置reinvite的等待开始计时
reinviting_starttime_ = srs_update_system_time();
}
//到达等待时间设置SIP及session状态以进行REINVITE
if (srs_get_system_time() - reinviting_starttime_ > reinvite_wait_) {
reinviting_starttime_ = 0;
srs_trace("Session: Re-invite for disconnect, state=%s, sip=%s, media=%d", srs_gb_session_state(state_).c_str(),
srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected());
sip_->reset_to_register();
SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit);
}
}
}
return err;
}
2. SrsGbSipTcpConn
这个类是SIP信令的处理类,此类在收到注册信令时会通过bind_session()创建SrsGbSession ,也就是先有信令连接再有的SrsGbSession 对象。此类同时为SrsGbSession 提供当前的SIP状态。
类中对于SIP逻辑处理过程主要是在on_sip_message(SrsSipMessage* msg)中,根接收到的信令做出回应,同时做出自身状态的修改。
信令的接收解析和信令发送在两个专门类中完,所以类中包含了SrsGbSipTcpReceiver 及 SrsGbSipTcpSender 两个对象,这两个对象都有自己线程,独立运转。
此类也是有一个状态机,为SrsGbSession的运行提供状态依据。状态的修改也是位于 自身drive_state(msg) 中,此函数的调用是与信令有关的, 它的调用位置只有两个位置,一个是接收命令的处理位置on_sip_message(msg)中,另一个是信令发送时,在放入发送队列的enqueue_sip_message(msg)中。
Sip的状态是比较复杂的,因为整个过程下来环节比较多。我们看下状态的定义:
enum SrsGbSipState
{
//可切换到established状态
// sip完成注册,media完成连接之前
SrsGbSipStateInit = 0,
//可切换到 inviting
//已经完成注册过程,发送invite之前
SrsGbSipStateRegistered,
//可切换到 trying
//可切换到 stable
//已经发送 invit 收到trying回复之前
SrsGbSipStateInviting,
//可切换到 stable
//收到trying时,收到 INVITE 200 OK 之前
SrsGbSipStateTrying,
//这个状态本想着是用作重invite,但实际是没有使用的,reinvite就是走的invite过程
SrsGbSipStateReinviting,
//可切换到re-inviting
//可切换到bye,当收到设备bye时
//收到INVITE 200 OK,发送INVITE OK回复给设备之后
SrsGbSipStateStable,
//收到BYE后, 直接退出session了,你可以看 SrsGbSession::do_cycle()中有//一个判断;按照GB收到BYE后,应该是断开media的连接回到一个注册完成状//态;但SRS实现不完整所以就直接断开了
SrsGbSipStateBye,
};
内部drive_state()是状态驱动过程,依据上面状态切换规则进行当前的状态向下一个状态的切换。
void SrsGbSipTcpConn::drive_state(SrsSipMessage* msg)
{
srs_error_t err = srs_success;
#define SRS_GB_SIP_CHANGE_STATE_TO(state) { \
SrsGbSipState ostate = set_state(state); \
srs_trace("SIP: Change device=%s, state=%s", register_->device_id().c_str(), \
srs_sip_state(ostate, state_).c_str()); \
}
if (state_ == SrsGbSipStateInit) {
// 如果当前是注册命令,我们将修改状态,让SESSION能进入自动invite的过程
if(msg->is_register()&&msg->expires_>0)SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateRegistered);
// 收到 unregister,我们将销毁session,因为可设备要停止流的发布.
if(msg->is_register()&&msg->expires_==0)SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateBye);
// 收到心跳消息,我们切换为stable状态.
if (msg->is_message()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateStable);
}
if (state_ == SrsGbSipStateRegistered) {
if (msg->is_invite()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateInviting);
}
if (state_ == SrsGbSipStateInviting) {
if (msg->is_trying()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateTrying);
if (msg->is_invite_ok()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateStable);
//如果设备收到了invite请求并且断开了,它可能会重新注册,这时我们要重新发起INVITE
if (msg->is_register()) {
srs_warn("SIP: Re-invite for got REGISTER in state=%s", srs_gb_sip_state(state_).c_str());
if ((err = invite_request(NULL)) != srs_success) {
// TODO: FIXME: Should fail the SIP session.
srs_freep(err);
}
}
}
if (state_ == SrsGbSipStateTrying) {
if (msg->is_invite_ok()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateStable);
}
if (state_ == SrsGbSipStateStable) {
//设备发送 bye 或 unregister, 我们将销毁session,因为它可能不再进行流发布
if (msg->is_register() && msg->expires_ == 0)SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateBye);
if (msg->is_bye()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateBye);
}
if (state_ == SrsGbSipStateReinviting) {
if (msg->is_bye_ok()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateInviting);
}
}
class SrsGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandler
负责数据的接收解析,解析完成后调用SrsGbSipTcpConn::on_sip_message(SrsSipMessage* msg)进行逻辑过程处理,包括信息回复状态驱动等
class SrsGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler
负数数据的发在,所有回复的信息都会经enqueue_sip_message(mag)放入例,Sender协程就是循环取出重组进行发送。
3. SrsGbMediaTcpConn
此类就是一个负责接收媒体流的类, 在收到PS流后会根据SSRC查找session进行绑定关联。
PS流的解析主要依赖SrsPackContext类,解析完成后回调到SrsGbMediaTcpConn 及 session的on_ps_pack(pack_, ps, msgs); 完成流的解析。然后依靠SrsGbMuxer进行 rtmp 推流到核心。
4. SrsGbListener
主要对SIP 和 media 连接端口监听,SRS只实现了TCP的监听连接,在on_tcp_client(ISrsListener* listener, srs_netfd_t stfd)中对进入TCP连接事件处理,根据监听对象的不同生成不同的连接,对SOCKET部分已经在ST协程下进行了封装,此处不再详述。注意在初始化时也有对GB API的添加和监听,此处只有一个public的接口,这是支持扩展服务器媒体流上传的接口。
看一下监听连接的处理
srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stfd)
{
srs_error_t err = srs_success;
// Handle TCP connections.
if (listener == sip_listener_) {
//处理SIP连接
SrsGbSipTcpConn* raw_conn = new SrsGbSipTcpConn();
raw_conn->setup(conf_, sip_listener_, media_listener_, stfd);
SrsSharedResource<SrsGbSipTcpConn>* conn = new SrsSharedResource<SrsGbSipTcpConn>(raw_conn);
_srs_gb_manager->add(conn, NULL);
//建立协程
SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn);
raw_conn->setup_owner(conn, executor, executor);
//启动协程
if ((err = executor->start()) != srs_success) {
srs_freep(executor);
return srs_error_wrap(err, "gb sip");
}
} else if (listener == media_listener_) {
//建立媒体流接收连接对象
SrsGbMediaTcpConn* raw_conn = new SrsGbMediaTcpConn();
raw_conn->setup(stfd);
SrsSharedResource<SrsGbMediaTcpConn>* conn = new SrsSharedResource<SrsGbMediaTcpConn>(raw_conn);
_srs_gb_manager->add(conn, NULL);
//建立协程
SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn);
raw_conn->setup_owner(conn, executor, executor);
//启动协程
if ((err = executor->start()) != srs_success) {
srs_freep(executor);
return srs_error_wrap(err, "gb media");
}
} else {
srs_warn("GB: Ignore TCP client");
srs_close_stfd(stfd);
}
return err;
}