当前位置: 首页 > news >正文

.net core集成MQTT服务端

程序作为MQTT的服务端,也是WebApi 接口地址,在Web页面中MQTTJS用的是Websocker协议,在Winfrom中用MQTT协议。导致程序需要启动两个端口。直接上代码

创建服务

引用包:MQTTnet,MQTTnet.AspNetCore,这包最新的5.*,我引用的是4.3.7.1207。5.*的包在接收消息的处理方式略微不同

MqttConfigServer

  public class MqttConfigServer
  {
      /// <summary>
      /// 客户端最大连接数
      /// </summary>
      public int MaxPendingPerClient { get; set; }
      /// <summary>
      /// Mqtt端口
      /// </summary>
      public int MqttPort { get; set; }
      /// <summary>
      /// MqttWs端口
      /// </summary>
      public int MqttWsPort { get; set; }

      /// <summary>
      /// MqttWs路径
      /// </summary>
      public string MqttWsPath { get; set; }
  }

这是拓展的方法,添加mqtt服务
AppInfo.GetOptions() 这是获取配置文件

 /// <summary>
 /// 添加mqtt服务
 /// </summary>
 /// <param name="services"></param>
 /// <param name="webHostBuilder"></param>
 public static void AddMqttService(this IServiceCollection services, IWebHostBuilder webHostBuilder)
 {
     var mqttServerConfig = AppInfo.GetOptions<MqttConfigServer>();

     webHostBuilder.ConfigureKestrel((context, serverOptions) =>
     {
         //配置Mqtt端口
         serverOptions.ListenAnyIP(mqttServerConfig.MqttPort, options => options.UseMqtt());
         //配置web端口
         serverOptions.ListenAnyIP(mqttServerConfig.MqttWsPort);
     });

     //配置MQTT服务
     services.AddHostedMqttServerWithServices(options =>
     {
         options.WithoutDefaultEndpoint();
         options.WithMaxPendingMessagesPerClient(mqttServerConfig.MaxPendingPerClient);
         options.WithKeepAlive();
     });
     services.AddMqttConnectionHandler();
     services.AddConnections();
	 //注入单例mqtt侦听服务
     services.AddSingleton<MyMqttService>();
 }

MyMqttService

public class MyMqttService
{
    public MyMqttService()
    { }
    public Task Server_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
    {
        Console.WriteLine($"客户端:【{arg.SenderId}】,主题【{arg.ApplicationMessage.Topic}】收到消息:{System.Text.Encoding.Default.GetString(arg.ApplicationMessage.PayloadSegment)}");
        return Task.CompletedTask;
    }
    /// <summary>
    /// 客户端验证账号密码
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public Task Server_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
    {
        Console.WriteLine($"{arg.ClientId} 上线了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
        return Task.CompletedTask;
    }
    /// <summary>
    /// 侦听所有消息
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public async Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
    {
        Console.WriteLine($"侦听所有消息:【{arg.ClientId}】,主题【{arg.ApplicationMessage.Topic}】收到消息:{System.Text.Encoding.Default.GetString(arg.ApplicationMessage.PayloadSegment)}");

        await Task.CompletedTask;
        return;
    }
    /// <summary>
    /// 客户端断开连接
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public Task Server_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
    {
        Console.WriteLine($"【{arg.ClientId}】 下线了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
        return Task.CompletedTask;
    }

    /// <summary>
    /// 客户端上线
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public Task Server_ClientConnectedAsync(ClientConnectedEventArgs arg)
    {
        Console.WriteLine($"{arg.ClientId} 上线了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
        return Task.CompletedTask;
    }
    /// <summary>
    /// 客户端取消订阅
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public Task Server_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
    {
        return Task.CompletedTask;
    }
    /// <summary>
    /// 客户端订阅
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public Task Server_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
    {
        return Task.CompletedTask;
    }
    /// <summary>
    /// 客户端保留消息
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public Task Server_RetainedMessageChangedAsync(RetainedMessageChangedEventArgs arg)
    {
        Console.WriteLine($"收到消息:{arg.ClientId}");
        return Task.CompletedTask;
    }
}

在Program 添加mqtt 服务

  //添加Mqtt服务
    services.AddMqttService(builder.WebHost);
    var mqttServerConfig = AppInfo.GetOptions<MqttConfigServer>();

  //WsMqtt 路径
  app.MapConnectionHandler<MqttConnectionHandler>(mqttServerConfig.MqttWsPath, http => http.WebSockets.SubProtocolSelector = protocolList => protocolList.FirstOrDefault() ?? string.Empty);
  var mqttService = app.Services.GetRequiredService<MyMqttService>();
  //加载MQTT回调
  app.UseMqttServer(server =>
  {
      //验证账号密码
      server.ValidatingConnectionAsync += mqttService.Server_ValidatingConnectionAsync;
      //侦听为消费消息
      server.ApplicationMessageNotConsumedAsync += mqttService.Server_ApplicationMessageNotConsumedAsync;
      //侦听所有消息
      server.InterceptingPublishAsync += mqttService.Server_InterceptingPublishAsync;
      //客户端订阅消息
      server.ClientSubscribedTopicAsync += mqttService.Server_ClientSubscribedTopicAsync;
      //客户端取消订阅消息
      server.ClientUnsubscribedTopicAsync += mqttService.Server_ClientUnsubscribedTopicAsync;
      //客户端链接事件
      server.ClientConnectedAsync += mqttService.Server_ClientConnectedAsync;
      //客户端关闭事件
      server.ClientDisconnectedAsync += mqttService.Server_ClientDisconnectedAsync;
      //保留消息
      server.RetainedMessageChangedAsync += mqttService.Server_RetainedMessageChangedAsync;
  });

mqttconfig

{
    "maxPendingPerClient": 1000, //客户端最大等待连接数
    "mqttPort": 1883, //mqtt端口
    "mqttWsPort": 8083, //mqtt websocket端口
    "mqttWsPath": "/mqtt" //mqtt websocket路径

  }

这样服务就完成,这里mqtt 服务并不影响WebApi 的接口,只是启动的两个端口

服务端主动推送消息

 private IServiceProvider _serviceProvider;
 public ClientControlController(IServiceProvider serviceProvider)
 {
     _serviceProvider = serviceProvider;
 }

  /// <summary>
  /// 服务端推送消息
  /// </summary>
  /// <returns></returns>
  private async Task PushMsg(string topic, ClientControl control)
  {
      var server = _serviceProvider.GetService<MqttHostedServer>() as MqttServer;
      //获取客户端
      var client = (await server?.GetClientsAsync()).Where(m => m.Id == "HardwareService").FirstOrDefault();
	  //推送消息
      var payload = JsonConvert.SerializeObject(control);

      var message = new MqttApplicationMessageBuilder()
          .WithTopic(topic)//主题
          .WithPayload(payload)
          .Build();

      if (client != null)
      {
          await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(message)
          {
              SenderClientId = client.Id,

          });
      }
  }

相关文章:

  • Pytorch中的torch.utils.data.Dataset 类
  • Next-Auth 认证系统:用户与管理员双角色登录配置
  • 【深度技术揭秘】 Android SystemUI锁屏界面动态布局重构:横竖屏智能适配指南
  • 【最后203篇系列】022 用Deepseek14b提取新闻事件
  • 官方通知 | 2025年CAIP人工智能职场应用师(AI职场应用师)职业能力认证正式发布
  • 【机器学习】机器学习四大分类
  • Camera2 与 CameraX 闲谈
  • 【惯性系与固连系速度位置加速度转换关系】
  • Redis 内存淘汰策略
  • Compose 原理解析
  • 【信息系统项目管理师】【高分范文】【历年真题】​论信息系统项目的风险管理
  • 基于大模型的甲状舌管囊肿全流程预测与临床方案研究报告
  • 【第22节】windows网络编程模型(WSAAsyncSelect模型)
  • 【江协科技STM32】软件SPI读写W25Q64芯片(学习笔记)
  • 小米AX6000解锁ssh避坑笔记
  • 【java面试】线程篇
  • AC交流采样电路
  • DL学习笔记:穿戴设备上的轻量级人体活动识别方法
  • AI Agent开发大全第四课-提示语工程:从简单命令到AI对话的“魔法”公式
  • 【赵渝强老师】在Docker中运行达梦数据库
  • 《卿本著者》译后记等内容被指表述不当,江苏人民出版社:即日下架
  • 人民论坛:是民生小事,也是融合大势
  • 巴基斯坦召开国家安全委员会紧急会议,应对印方连环举措
  • 研究|和去年相比,人们使用AI的需求发生了哪些变化?
  • 张文宏团队公布广谱抗猴痘药物研发进展,将进入临床审批阶段
  • 上海车展上的双向奔赴:跨国车企融入中国创新,联手“在中国,为全球”