当前位置: 首页 > news >正文

aws(学习笔记第三十九课) iot-msk-pipeline

文章目录

  • aws(学习笔记第三十九课) iot-msk-pipeline
  • 学习内容:
    • 1. 整体架构
      • 1.1 代码链接
      • 1.2 代码调整
      • 1.2 整体架构(概要)
      • 1.3 整体架构(详细)
    • 2. 代码解析
      • 2.1 创建`IotProducerDestination`
      • 2.2 创建`IotProducer`
      • 2.3 创建`MSK client`的`EC2`
      • 2.4 创建`MSK cluster`
      • 2.5 创建`Main Stack`
        • 2.5.1 创建需要的`subnet`
        • 2.5.2 创建需要的`VPC`
        • 2.5.3 创建需要的`cluster`和`client`
        • 2.5.4 为`IOT destination`创建需要的`role`
        • 2.5.5 创建`IOT Message Rule`
    • 3. 执行测试

aws(学习笔记第三十九课) iot-msk-pipeline

  • 使用 iotmsk

学习内容:

  • 使用 iotmsk

1. 整体架构

1.1 代码链接

  • 代码链接(iot-msk-lambda-pipeline)

1.2 代码调整

  • 建立cluster的时候,kafka_version=msk.KafkaVersion.V3_4_0,,需要指定高版本
     # MSK cluster with SASL/SCRAM authenticationself.cluster = msk.Cluster(self, "Cluster",cluster_name="iotCluster",kafka_version=msk.KafkaVersion.V3_4_0,vpc=vpc,encryption_in_transit=msk.EncryptionInTransitConfig(client_broker=msk.ClientBrokerEncryption.TLS),client_authentication=msk.ClientAuthentication.sasl(scram=True),)
    
  • 生成scram secret
    # 1. 创建Secrets Manager密码(在CDK中)scram_secret = secretsmanager.Secret(self, "MSKScramSecret",secret_name="AmazonMSK_iotCluster_demo",generate_secret_string=secretsmanager.SecretStringGenerator(secret_string_template=json.dumps({"username": "admin"}),generate_string_key="password",exclude_characters='"@/\\',  # 避免特殊字符冲突password_length=16))
    
  • 将生成scram secret指定到MSK Clusterclient_authenticationsecret设定
    # MSK cluster with SASL/SCRAM authenticationself.cluster = msk.Cluster(self, "Cluster",cluster_name="iotCluster",kafka_version=msk.KafkaVersion.V3_4_0,vpc=vpc,encryption_in_transit=msk.EncryptionInTransitConfig(client_broker=msk.ClientBrokerEncryption.TLS),client_authentication=msk.ClientAuthentication.sasl(scram=True,secret=scram_secret),
    
  • 指定clientkafka version的时候,需要指定高版本
    constants = {"KAFKA_DOWNLOAD_VERSION": "kafka_2.13-4.0.0","KAFKA_VERSION": "4.0.0","KAFKA_CLIENT_INSTANCE": "t2.xlarge","MSK_TOPIC": "topic","IOT_TOPIC": 'iot/topic'
    }
    
  • 伴随升版,requirement.txt也需要改修
    aws-cdk-lib==2.101.0
    constructs>=10.0.0,<11.0.0
    aws-cdk.aws-msk-alpha==2.101.0a0
    boto3>=1.28.0
    
  • 新版本Kafka(2.2+)已弃用–zookeeper参数,改用–bootstrap-server
    instance.user_data.add_commands这里需要改成如下,需要的client-ssl.properties在下面作成。
    ./bin/kafka-topics.sh \--create \--bootstrap-server b-1.iotcluster.g9od7s.c4.kafka.ap-northeast-1.amazonaws.com:9096 \--topic topic \--partitions 3 \--replication-factor 2 \--command-config client-ssl.properties
    
  • export KAFKA_HEAP_OPTS=“-Xmx1G -Xms512M”
  • 创建Java Keystore (JKS) 格式的truststore
    其中,AmazonRootCA1.pem文件需要执行
    wget https://www.amazontrust.com/repository/AmazonRootCA1.pem进行获得。
    keytool -importcert \-alias AmazonMSKCA \-file AmazonRootCA1.pem \-keystore kafka.client.truststore.jks \-storepass 123456 \-noprompt
    
  • 生成client-ssl.properties
      cat << EOF > client-ssl.propertiesbootstrap.servers=b-1.iotcluster.g9od7s.c4.kafka.ap-northeast-1.amazonaws.com:9096security.protocol=SSLssl.truststore.location=/path/to/kafka.client.truststore.jksssl.truststore.password=123456EOF
    
  • 导出KAFKA_OPTS环境变量
    export KAFKA_OPTS="-Djavax.net.ssl.trustStore=~/kafka.client.truststore.jks -Djavax.net.ssl.trustStorePassword=123456"
    
  • 尝试kafka-topics.shlist命令
    	./bin/kafka-topics.sh \--bootstrap-server b-1.iotcluster.g9od7s.c4.kafka.ap-northeast-1.amazonaws.com:9096 \--command-config client-ssl.properties \--list
    

1.2 整体架构(概要)

在这里插入图片描述
* AWS Iot Core会生产event写入到Amazon MSK Brokertopic
* 客户端client会监视这个topic,如果有AWS Iot Coremessage接受过来,就会接收到。

1.3 整体架构(详细)

在这里插入图片描述

2. 代码解析

2.1 创建IotProducerDestination

这个destination用来指向msk vpc

# Iot destination which points to the MSK VPC
class IotProducerDestination(NestedStack):def __init__(self,scope: Construct,construct_id: str,vpc_id,role_arn,subnet_ids,**kwargs):super().__init__(scope, construct_id, **kwargs)# Create Iot Messaging Destination for MSK Clusterdestination = iot.CfnTopicRuleDestination(self, "TopicDestination",vpc_properties=iot.CfnTopicRuleDestination.VpcDestinationPropertiesProperty(role_arn=role_arn,vpc_id=vpc_id,subnet_ids=subnet_ids))self.arn = destination.attr_arn

在这里插入图片描述

2.2 创建IotProducer

使用上面创建的destination,继续创建指向kafkarule,使得发送给IOT destinationmessage都会重定向到MSK中。

# Iot producer that routes messages to the MSK cluster topic created by the client
class IotProducer(NestedStack):def __init__(self,scope: Construct,construct_id: str,vpc_id,role_arn,subnet_ids,bootstrap_brokers_sasl_scram,**kwargs):super().__init__(scope, construct_id, **kwargs)# Iot destinationdestination = IotProducerDestination(self, "IotProducerTopicDestination",role_arn=role_arn,vpc_id=vpc_id,subnet_ids=subnet_ids)# Create Iot Messaging Rule for MSK Cluster using the destination ARNrule = iot.CfnTopicRule(self, "TopicRule",topic_rule_payload=iot.CfnTopicRule.TopicRulePayloadProperty(actions=[iot.CfnTopicRule.ActionProperty(kafka=iot.CfnTopicRule.KafkaActionProperty(destination_arn=destination.arn,topic=constants["MSK_TOPIC"],client_properties= {'bootstrap.servers': bootstrap_brokers_sasl_scram,'sasl.mechanism': 'SCRAM-SHA-512','security.protocol': 'SASL_SSL','sasl.scram.username': "${get_secret('AmazonMSK_iotCluster_demo', 'SecretString', 'username', '" + role_arn + "')}",'sasl.scram.password': "${get_secret('AmazonMSK_iotCluster_demo', 'SecretString', 'password', '" + role_arn + "')}"}))],sql='SELECT * FROM "' + constants["IOT_TOPIC"] + '"'))

2.3 创建MSK clientEC2

# Ec2 instance that creates the kafka topic and can be used to consume MSK messages
class MSKClient(NestedStack):def __init__(self,scope: Construct,construct_id: str,vpc,client_subnet,zookeeper,**kwargs):super().__init__(scope, construct_id, **kwargs)# Amazon Linux AMIamzn_linux = ec2.MachineImage.latest_amazon_linux(generation=ec2.AmazonLinuxGeneration.AMAZON_LINUX_2,edition=ec2.AmazonLinuxEdition.STANDARD,virtualization=ec2.AmazonLinuxVirt.HVM,storage=ec2.AmazonLinuxStorage.GENERAL_PURPOSE)# MSK client Rolerole = iam.Role(self, "InstanceSSM", assumed_by=iam.ServicePrincipal("ec2.amazonaws.com"))# AWS managed policy added to MSK client rolerole.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSSMManagedInstanceCore"))# MSK Clientinstance = ec2.Instance(self, "Instance",instance_type=ec2.InstanceType(constants["KAFKA_CLIENT_INSTANCE"]),machine_image=amzn_linux,vpc = vpc,vpc_subnets=client_subnet,role = role,)# Ec2 security group in the MSK VPCclient_security_group = ec2.SecurityGroup(self, 'InstanceSecurityGroup', vpc=vpc)# Enable connection from anywhere on port 22client_security_group.add_ingress_rule(ec2.Peer.ipv4('0.0.0.0/0'),ec2.Port.tcp(22),)instance.add_security_group(client_security_group)# Commands to install dependencies and create the kafka topicinstance.user_data.add_commands(# "yum install java-1.8.0 -y","yum install java-11-amazon-corretto -y",f'wget https://archive.apache.org/dist/kafka/{constants["KAFKA_VERSION"]}/{constants["KAFKA_DOWNLOAD_VERSION"]}.tgz',f"tar -xzf {constants['KAFKA_DOWNLOAD_VERSION']}.tgz",f"./{constants['KAFKA_DOWNLOAD_VERSION']}/bin/kafka-topics.sh --create --zookeeper {zookeeper} --replication-factor 2 --partitions 1 --topic {constants['MSK_TOPIC']}",)

在这里插入图片描述

2.4 创建MSK cluster

# MSK Cluster and client
class MskBroker(NestedStack):def __init__(self, scope: Construct, construct_id: str, vpc, client_subnet, **kwargs):super().__init__(scope, construct_id, **kwargs)# MSK cluster with SASL/SCRAM authenticationself.cluster = msk.Cluster(self, "Cluster",cluster_name="iotCluster",kafka_version=msk.KafkaVersion.V3_4_0,vpc=vpc,encryption_in_transit=msk.EncryptionInTransitConfig(client_broker=msk.ClientBrokerEncryption.TLS),client_authentication=msk.ClientAuthentication.sasl(scram=True),)# Enable MSK cluster connection on ports 2181 and 9096 for SASL/SCRAM authenticationself.cluster.connections.allow_from(ec2.Peer.ipv4("0.0.0.0/0"),ec2.Port.tcp(2181))self.cluster.connections.allow_from(ec2.Peer.ipv4("0.0.0.0/0"),ec2.Port.tcp(9096))# EC2 Instance in the public subnet used to create the topicsclient = MSKClient(self, "MskClient",vpc=vpc,client_subnet=client_subnet,zookeeper=self.cluster.zookeeper_connection_string)

在这里插入图片描述

2.5 创建Main Stack

2.5.1 创建需要的subnet
 # 1 Public Subnet and 2 Private Subnetssubnets = []# Public subnet used to host the MSK Clientsubnets.append(ec2.SubnetConfiguration(name = "MSKDemo-subnet-public1",subnet_type = ec2.SubnetType.PUBLIC,cidr_mask = 20))# Private subnet hosting one of the two MSK brokerssubnets.append(ec2.SubnetConfiguration(name = "MSKDemo-subnet-private1",subnet_type = ec2.SubnetType.PRIVATE_WITH_NAT,cidr_mask = 20))# Private subnet hosting one of the two MSK brokerssubnets.append(ec2.SubnetConfiguration(name = "MSKDemo-subnet-private2",subnet_type = ec2.SubnetType.PRIVATE_ISOLATED,cidr_mask = 20))

在这里插入图片描述

2.5.2 创建需要的VPC
 # VPC in which the MSK cluster and client are locatedvpc = ec2.Vpc(self, "MskVpc",cidr="10.0.0.0/16",nat_gateways=1,max_azs=2,subnet_configuration = subnets)

在这里插入图片描述

2.5.3 创建需要的clusterclient
        # MSK cluster and an EC2 Instance used to create the topicsmsk_cluster = MskBroker(self, 'MSKBroker',vpc=vpc,client_subnet=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC)).cluster
2.5.4 为IOT destination创建需要的role
        # Policies needed for Iot to send messages to MSKiot_task_policy = iam.PolicyStatement(actions=["ec2:CreateNetworkInterface","ec2:DescribeNetworkInterfaces","ec2:CreateNetworkInterfacePermission","ec2:DeleteNetworkInterface","ec2:DescribeSubnets","ec2:DescribeVpcs","ec2:DescribeVpcAttribute","ec2:DescribeSecurityGroups","secretsmanager:DescribeSecret","secretsmanager:GetSecretValue"],resources=["*"])# Role passed to the Iot destinationiot_task_role = iam.Role(self, "IotTaskRole",assumed_by=iam.ServicePrincipal('iot.amazonaws.com'))# Add the IAM policies above to the task roleiot_task_role.add_to_policy(iot_task_policy)
2.5.5 创建IOT Message Rule
 # Create the Iot Messaging destination and ruleiot_producer = IotProducer(self, 'IotProducer',vpc_id=vpc.vpc_id,role_arn=iot_task_role.role_arn,subnet_ids=vpc.select_subnets(subnet_type=ec2.SubnetType.PRIVATE_WITH_NAT).subnet_ids,bootstrap_brokers_sasl_scram=msk_cluster.bootstrap_brokers_sasl_scram)

3. 执行测试

接下来继续
IOT things -> IOT Core -> IOT Destination -> rule -> MSK cluster -> MSK client
的连接测试。

相关文章:

  • 设备接入与APP(应用程序)接入华为云iotDA平台的路径元素有哪些不同?
  • ThreadLocal详解与实战指南
  • 深入浅出Sentinel:分布式系统的流量防卫兵
  • 【uniapp】vue2 搜索文字高亮显示
  • MongoDB Shard Cluster
  • 科技赋能建筑新未来:中建海龙模块化建筑产品入选中国建筑首批产业化推广产品
  • Kafka和flume整合
  • HOW - 如何模拟实现 gpt 展示答案的交互效果
  • Python判断语句-语法:if,if else,if elif else,嵌套,if else语句扁平式写法,案例
  • android jatpack Compose 多数据源依赖处理:从状态管理到精准更新的架构设计
  • kafka整合flume与DStream转换
  • #苍穹外卖# day 10-11
  • Move Registry 发布,实现 Sui 的超级互操作性
  • ubuntu22.04部署Snipe-IT
  • MYSQL 常用字符串函数 和 时间函数详解
  • 信息学奥赛一本通 1509:【例 1】Intervals | OpenJudge 百练 1201:Intervals
  • 云服务器centos 安装hadoop集群
  • CS001-7-hbao
  • 海之淀攻略
  • 【视频时刻检索】Text-Video Retrieval via Multi-Modal Hypergraph Networks 论文阅读
  • 白俄罗斯驻华大使:应发挥政党作用,以对话平台促上合组织发展与合作
  • 云南省委常委、组织部部长刘非任浙江省委常委、杭州市委书记
  • “五一”前多地市监部门告诫经营者:对预订客房不得毁约提价
  • 光线传媒:正与部分重点地区洽谈主题乐园合作,首款3A游戏预计三年左右推出
  • 生于1984年,马玥已任辽宁锦州北镇市代市长
  • 俄外长拉夫罗夫将出席金砖国家外长会