当前位置: 首页 > news >正文

SRS-GB28181 实现浅析之二--基本逻辑与结构

        在分析基本模块之前我们先说一下, SRS新版中已经把线程模块改为ST协程模型,

每个需要处理数据的地方就需要加一个协程来处理,他的好处是有着多线程的运行特点实则是单线,安全好用。内部对ST都封装了,你只需要实cycle()接口就可以使用。

     GB模块中最主要的类有4个, 其它的功能类都是为这几个类服务的,我们来分析一下

1. SrsGbSession

     此类一个完整SIP会话的最顶层管理类,内部包含了SIP信令连接类和MEDIA连接类两个变量指针。类只是根据状态进行具体的INVITE.类中保存了当前SESSION 的状态,状态只有三个如下:
 

enum SrsGbSessionState
{
    //可切换到connecting
    //在发送invite信令之前
    SrsGbSessionStateInit = 0, 

    //可切换到established状态
    // sip完成注册,media完成连接之前
    SrsGbSessionStateConnecting, 

    //可切换到init: media 没有连接,进行reinvite
    //可切换到session 销毁: when sip is bye.
    //sip是稳定状态, media 完成连接
    SrsGbSessionStateEstablished,
};

    内部drive_state()是状态驱动过程,依据上面三个状态切换规则进行当前的状态向下一个状态的切换。

srs_error_t SrsGbSession::drive_state()
{
    srs_error_t err = srs_success;
    #define SRS_GB_CHANGE_STATE_TO(state) { \
        SrsGbSessionState ostate = set_state(state); \
        srs_trace("Session: Change device=%s, state=%s", sip_->device_id().c_str(), \
            srs_gb_state(ostate, state_).c_str()); \
    }
    if (state_ == SrsGbSessionStateInit) {
        if (sip_->is_registered()) {
            SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateConnecting);
           //记录media 开始边接时间,以计算超时
            connecting_starttime_ = srs_update_system_time();
        }

        // 如果MEDIA 没有连接那么进行invite操作
        if (sip_->is_registered() && !media_->is_connected()) {
            uint32_t ssrc = 0;
            if ((err = sip_->invite_request(&ssrc)) != srs_success) {
                return srs_error_wrap(err, "invite");
            }            
           //当PS流来时会根据ssrc来找到session进行处理, 所以把SESSION放入到快查询MAP中
            _srs_gb_manager->add_with_fast_id(ssrc, wrapper_);
        }
    }
if (state_ == SrsGbSessionStateConnecting) {
   //处理MEDIA连接的超时
        if (srs_update_system_time() - connecting_starttime_ >= connecting_timeout_) {
           //如果超时三次那么就返回错误,由上层线程循环根据返回值进行SESSION 销毁
            if ((nn_timeout_++) > SRS_GB_MAX_TIMEOUT) {
                return srs_error_new(ERROR_GB_TIMEOUT, "timeout");
            }
            srs_trace("Session: Connecting timeout, nn=%d, state=%s, sip=%s, media=%d", nn_timeout_, srs_gb_session_state(state_).c_str(),
                srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected());
            //超时了那重置SIP及session状态,那再次执行本函数体时就会重新发起invite请求
            sip_->reset_to_register();
            SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit);
        }
        //已经完成正常过程,那么就进入了稳定状态
        if (sip_->is_stable() && media_->is_connected()) {
            SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateEstablished);
        }
    }
if (state_ == SrsGbSessionStateEstablished) {
        //稳定状态下,如果收到了来自设备的BYE指令,直接返回,不再进行处理,由于srs的实现不完善,所以此处是直接返回,依据GB标准,那么应该是断开media连接,恢复到SrsGbSessionStateInit,保持在注册在线状态。
        if (sip_->is_bye()) {
            srs_trace("Session: Dispose for client bye");
            return err;
        }
        //稳定状态下断开了MEDIA连接,要得新设置SIP及session状态以进行重连,注意要等等一定的时间才能reinvite.
        if (!media_->is_connected()) {
            if (!reinviting_starttime_) {
                //设置reinvite的等待开始计时
                reinviting_starttime_ = srs_update_system_time();
            }
            //到达等待时间设置SIP及session状态以进行REINVITE
            if (srs_get_system_time() - reinviting_starttime_ > reinvite_wait_) {
                reinviting_starttime_ = 0;
                srs_trace("Session: Re-invite for disconnect, state=%s, sip=%s, media=%d", srs_gb_session_state(state_).c_str(),
                    srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected());
                sip_->reset_to_register();
                SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit);
            }
        }
    }
    return err;
}

2. SrsGbSipTcpConn

      这个类是SIP信令的处理类,此类在收到注册信令时会通过bind_session()创建SrsGbSession ,也就是先有信令连接再有的SrsGbSession 对象。此类同时为SrsGbSession 提供当前的SIP状态。

       类中对于SIP逻辑处理过程主要是在on_sip_message(SrsSipMessage* msg)中,根接收到的信令做出回应,同时做出自身状态的修改。

      信令的接收解析和信令发送在两个专门类中完,所以类中包含了SrsGbSipTcpReceiver 及 SrsGbSipTcpSender 两个对象,这两个对象都有自己线程,独立运转。

       此类也是有一个状态机,为SrsGbSession的运行提供状态依据。状态的修改也是位于 自身drive_state(msg) 中,此函数的调用是与信令有关的, 它的调用位置只有两个位置,一个是接收命令的处理位置on_sip_message(msg)中,另一个是信令发送时,在放入发送队列的enqueue_sip_message(msg)中。

Sip的状态是比较复杂的,因为整个过程下来环节比较多。我们看下状态的定义:

enum SrsGbSipState
{
    //可切换到established状态
    // sip完成注册,media完成连接之前
    SrsGbSipStateInit = 0,

    //可切换到 inviting 
    //已经完成注册过程,发送invite之前
    SrsGbSipStateRegistered,

    //可切换到 trying 
    //可切换到 stable
    //已经发送 invit 收到trying回复之前
    SrsGbSipStateInviting,

    //可切换到 stable
    //收到trying时,收到 INVITE 200 OK 之前
    SrsGbSipStateTrying,

    //这个状态本想着是用作重invite,但实际是没有使用的,reinvite就是走的invite过程
    SrsGbSipStateReinviting,

    //可切换到re-inviting
    //可切换到bye,当收到设备bye时
    //收到INVITE 200 OK,发送INVITE OK回复给设备之后
    SrsGbSipStateStable,

   //收到BYE后, 直接退出session了,你可以看 SrsGbSession::do_cycle()中有//一个判断;按照GB收到BYE后,应该是断开media的连接回到一个注册完成状//态;但SRS实现不完整所以就直接断开了
    SrsGbSipStateBye,
};

内部drive_state()是状态驱动过程,依据上面状态切换规则进行当前的状态向下一个状态的切换。

void SrsGbSipTcpConn::drive_state(SrsSipMessage* msg)
{
    srs_error_t err = srs_success;

    #define SRS_GB_SIP_CHANGE_STATE_TO(state) { \
        SrsGbSipState ostate = set_state(state); \
        srs_trace("SIP: Change device=%s, state=%s", register_->device_id().c_str(), \
            srs_sip_state(ostate, state_).c_str()); \
    }
    if (state_ == SrsGbSipStateInit) {
        // 如果当前是注册命令,我们将修改状态,让SESSION能进入自动invite的过程
        if(msg->is_register()&&msg->expires_>0)SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateRegistered);
        // 收到 unregister,我们将销毁session,因为可设备要停止流的发布.               
        if(msg->is_register()&&msg->expires_==0)SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateBye);
        // 收到心跳消息,我们切换为stable状态.
        if (msg->is_message()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateStable);
    }

    if (state_ == SrsGbSipStateRegistered) {
        if (msg->is_invite()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateInviting);
    }

    if (state_ == SrsGbSipStateInviting) {
        if (msg->is_trying()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateTrying);
        if (msg->is_invite_ok()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateStable);

        //如果设备收到了invite请求并且断开了,它可能会重新注册,这时我们要重新发起INVITE        
        if (msg->is_register()) {
            srs_warn("SIP: Re-invite for got REGISTER in state=%s", srs_gb_sip_state(state_).c_str());
            if ((err = invite_request(NULL)) != srs_success) {
                // TODO: FIXME: Should fail the SIP session.
                srs_freep(err);
            }
        }
    }

    if (state_ == SrsGbSipStateTrying) {
        if (msg->is_invite_ok()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateStable);
    }

    if (state_ == SrsGbSipStateStable) {
        //设备发送 bye 或 unregister, 我们将销毁session,因为它可能不再进行流发布
        if (msg->is_register() && msg->expires_ == 0)SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateBye);
        if (msg->is_bye()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateBye);
    }

    if (state_ == SrsGbSipStateReinviting) {
        if (msg->is_bye_ok()) SRS_GB_SIP_CHANGE_STATE_TO(SrsGbSipStateInviting);
    }
}

class SrsGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandler

负责数据的接收解析,解析完成后调用SrsGbSipTcpConn::on_sip_message(SrsSipMessage* msg)进行逻辑过程处理,包括信息回复状态驱动等

class SrsGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler

负数数据的发在,所有回复的信息都会经enqueue_sip_message(mag)放入例,Sender协程就是循环取出重组进行发送。

3. SrsGbMediaTcpConn

此类就是一个负责接收媒体流的类, 在收到PS流后会根据SSRC查找session进行绑定关联。

PS流的解析主要依赖SrsPackContext类,解析完成后回调到SrsGbMediaTcpConn 及 session的on_ps_pack(pack_, ps, msgs); 完成流的解析。然后依靠SrsGbMuxer进行 rtmp 推流到核心。

4. SrsGbListener

   主要对SIP 和 media 连接端口监听,SRS只实现了TCP的监听连接,在on_tcp_client(ISrsListener* listener, srs_netfd_t stfd)中对进入TCP连接事件处理,根据监听对象的不同生成不同的连接,对SOCKET部分已经在ST协程下进行了封装,此处不再详述。注意在初始化时也有对GB API的添加和监听,此处只有一个public的接口,这是支持扩展服务器媒体流上传的接口。

看一下监听连接的处理

srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stfd)
{
    srs_error_t err = srs_success;
    // Handle TCP connections.
    if (listener == sip_listener_) {
        //处理SIP连接	
        SrsGbSipTcpConn* raw_conn = new SrsGbSipTcpConn();
        raw_conn->setup(conf_, sip_listener_, media_listener_, stfd);
        SrsSharedResource<SrsGbSipTcpConn>* conn = new SrsSharedResource<SrsGbSipTcpConn>(raw_conn);
        _srs_gb_manager->add(conn, NULL);
        //建立协程
        SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn);
        raw_conn->setup_owner(conn, executor, executor);
        //启动协程
        if ((err = executor->start()) != srs_success) {
            srs_freep(executor);
            return srs_error_wrap(err, "gb sip");
        }
    } else if (listener == media_listener_) {
        //建立媒体流接收连接对象
        SrsGbMediaTcpConn* raw_conn = new SrsGbMediaTcpConn();
        raw_conn->setup(stfd);

        SrsSharedResource<SrsGbMediaTcpConn>* conn = new SrsSharedResource<SrsGbMediaTcpConn>(raw_conn);
        _srs_gb_manager->add(conn, NULL);
        //建立协程
        SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn);
        raw_conn->setup_owner(conn, executor, executor);
        //启动协程
        if ((err = executor->start()) != srs_success) {
            srs_freep(executor);
            return srs_error_wrap(err, "gb media");
        }
    } else {
        srs_warn("GB: Ignore TCP client");
        srs_close_stfd(stfd);
    }
    return err;
}

相关文章:

  • AI + 医疗 Qwq大模型离线本地应用
  • 【贝叶斯定理(Bayesian Theorem)】
  • 深入剖析Java虚拟机(JVM):从零开始掌握Java核心引擎
  • flutter doctor提示cmdline-tools component is missing错误的解决
  • 【006安卓开发方案调研】之大厂APP混合开发方案
  • 从零开始学习 Go 语言
  • Android项目实战搭建 MVVM架构
  • 人工智能时代——深度探索如何构建开放可控的专利生态体系
  • Linux笔记---文件系统软件部分
  • 9:内置函数
  • MyBatis-Plus 的加载及初始化
  • JVM垃圾回收笔记02-垃圾回收器
  • ESP32_WiFi连接
  • 重叠构造函数 、JavaBean模式、建造者模式、Spring的隐性大手
  • 《Operating System Concepts》阅读笔记:p460-p4470
  • Redis 事件机制详解
  • 第三十一篇 数据仓库(DW)与商业智能(BI)架构设计与实践指南
  • 流量分配的艺术: 如何设计一款负载均衡组件
  • 大数据环境搭建
  • C++之 【模板初阶(函数模板与类模板)】
  • “世纪火种”嘉年华启动,69家单位加入阅读“朋友圈”
  • 游戏论|迟来的忍者与武士:从《刺客信条:影》论多元话语的争议
  • 广汽集团一季度净亏损7.3亿元,同比转亏,总销量下滑9%
  • 南国置业:控股股东电建地产拟受让公司持有的房地产开发业务等相关资产和负债
  • 美称中美贸易谈判仍在进行中,外交部:美方不要混淆视听
  • 长三角数智文化产业基金意向签约会成功举办