当前位置: 首页 > news >正文

CSI-external-provisioner

main()

这段Go代码是一个CSI(容器存储接口)Provisioner(供应器)的实现,用于在Kubernetes集群中动态提供持久卷。代码涉及多个组件和步骤,下面是对关键部分的解释:

  1. 初始化和配置
  • 命令行标志和环境变量:代码使用flag包处理命令行参数,如feature-gateskubeconfig等。同时,从环境变量中获取配置,如NODE_NAME
  • 日志和度量:使用klog进行日志记录,并配置Prometheus度量收集器。
  1. Kubernetes客户端配置
  • 构建KubeConfig:根据是否提供了masterkubeconfig参数,决定是使用集群内配置(rest.InClusterConfig)还是外部配置(clientcmd.BuildConfigFromFlags)。
  • 客户端创建:使用配置创建Kubernetes客户端(kubernetes.NewForConfig)和CSI快照客户端(snapclientset.NewForConfig)。
  1. CSI驱动连接和验证
  • 连接到CSI驱动:通过gRPC连接到CSI驱动,并进行基本的探测(ctrl.Probe)以确保驱动可用。
  • 获取驱动名称和能力:从CSI驱动获取驱动名称(ctrl.GetDriverName)和能力(ctrl.GetDriverCapabilities)。
  1. 拓扑和节点信息
  • 拓扑支持:如果CSI驱动支持拓扑,则创建相应的informer来监视节点和CSINode对象。
  • 节点部署:如果启用了节点部署(--enable-node-deployment),则获取节点信息并配置节点部署对象。
  1. Provisioner和Controller创建
  • Provisioner创建:使用获取的配置和客户端创建CSI Provisioner对象,该对象实现了Provisioner接口。
  • 容量控制器:如果启用了容量功能(--enable-capacity),则创建容量控制器来发布存储容量信息。
  1. HTTP服务器和度量
  • HTTP服务器:如果指定了度量地址(--metrics-address)或HTTP端点(--http-endpoint),则启动HTTP服务器来暴露度量和可能的调试端点(如pprof)。
  1. Informers和队列
  • Informer和队列:创建各种资源的Informer来监视Kubernetes对象的变化,并使用工作队列处理事件。
  1. 运行
  • 启动Informer和控制器:启动Informer工厂和控制器,开始监视和处理事件。

总结
这段代码是一个复杂的CSI Provisioner实现,它集成了Kubernetes客户端、CSI驱动、度量收集、拓扑感知、容量管理等多个组件。通过精心设计的架构和模块化的代码,它能够在Kubernetes集群中高效地提供和管理持久卷。

func main() {
	var config *rest.Config
	var err error

	flag.Var(utilflag.NewMapStringBool(&featureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
		"Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n"))

	klog.InitFlags(nil)
	flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
	flag.Set("logtostderr", "true")
	flag.Parse()

	ctx := context.Background()

	if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil {
		klog.Fatal(err)
	}

	node := os.Getenv("NODE_NAME")
	if *enableNodeDeployment && node == "" {
		klog.Fatal("The NODE_NAME environment variable must be set when using --enable-node-deployment.")
	}

	if *showVersion {
		fmt.Println(os.Args[0], version)
		os.Exit(0)
	}
	klog.Infof("Version: %s", version)

	if *metricsAddress != "" && *httpEndpoint != "" {
		klog.Error("only one of `--metrics-address` and `--http-endpoint` can be set.")
		os.Exit(1)
	}
	addr := *metricsAddress
	if addr == "" {
		addr = *httpEndpoint
	}

	// get the KUBECONFIG from env if specified (useful for local/debug cluster)
	kubeconfigEnv := os.Getenv("KUBECONFIG")

	if kubeconfigEnv != "" {
		klog.Infof("Found KUBECONFIG environment variable set, using that..")
		kubeconfig = &kubeconfigEnv
	}

	if *master != "" || *kubeconfig != "" {
		klog.Infof("Either master or kubeconfig specified. building kube config from that..")
		config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
	} else {
		klog.Infof("Building kube configs for running in cluster...")
		config, err = rest.InClusterConfig()
	}
	if err != nil {
		klog.Fatalf("Failed to create config: %v", err)
	}

	config.QPS = *kubeAPIQPS
	config.Burst = *kubeAPIBurst

	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		klog.Fatalf("Failed to create client: %v", err)
	}

	// snapclientset.NewForConfig creates a new Clientset for  VolumesnapshotV1Client
	snapClient, err := snapclientset.NewForConfig(config)
	if err != nil {
		klog.Fatalf("Failed to create snapshot client: %v", err)
	}

	var gatewayClient gatewayclientset.Interface
	if utilfeature.DefaultFeatureGate.Enabled(features.CrossNamespaceVolumeDataSource) {
		// gatewayclientset.NewForConfig creates a new Clientset for GatewayClient
		gatewayClient, err = gatewayclientset.NewForConfig(config)
		if err != nil {
			klog.Fatalf("Failed to create gateway client: %v", err)
		}
	}

	metricsManager := metrics.NewCSIMetricsManagerWithOptions("", /* driverName */
		// Will be provided via default gatherer.
		metrics.WithProcessStartTime(false),
		metrics.WithSubsystem(metrics.SubsystemSidecar),
	)

	grpcClient, err := ctrl.Connect(ctx, *csiEndpoint, metricsManager)
	if err != nil {
		klog.Error(err.Error())
		os.Exit(1)
	}

	err = ctrl.Probe(ctx, grpcClient, *operationTimeout)
	if err != nil {
		klog.Error(err.Error())
		os.Exit(1)
	}

	// Autodetect provisioner name
	provisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout)
	if err != nil {
		klog.Fatalf("Error getting CSI driver name: %s", err)
	}
	klog.V(2).Infof("Detected CSI driver %s", provisionerName)
	metricsManager.SetDriverName(provisionerName)

	translator := csitrans.New()
	supportsMigrationFromInTreePluginName := ""
	if translator.IsMigratedCSIDriverByName(provisionerName) {
		supportsMigrationFromInTreePluginName, err = translator.GetInTreeNameFromCSIName(provisionerName)
		if err != nil {
			klog.Fatalf("Failed to get InTree plugin name for migrated CSI plugin %s: %v", provisionerName, err)
		}
		klog.V(2).Infof("Supports migration from in-tree plugin: %s", supportsMigrationFromInTreePluginName)

		// Create a new connection with the metrics manager with migrated label
		metricsManager = metrics.NewCSIMetricsManagerWithOptions(provisionerName,
			// Will be provided via default gatherer.
			metrics.WithProcessStartTime(false),
			metrics.WithMigration())
		migratedGrpcClient, err := ctrl.Connect(ctx, *csiEndpoint, metricsManager)
		if err != nil {
			klog.Error(err.Error())
			os.Exit(1)
		}
		grpcClient.Close()
		grpcClient = migratedGrpcClient

		err = ctrl.Probe(ctx, grpcClient, *operationTimeout)
		if err != nil {
			klog.Error(err.Error())
			os.Exit(1)
		}
	}

	// Prepare http endpoint for metrics + leader election healthz
	mux := http.NewServeMux()
	gatherers := prometheus.Gatherers{
		// For workqueue and leader election metrics, set up via the anonymous imports of:
		// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/workqueue/metrics.go
		// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/leaderelection/metrics.go
		//
		// Also to happens to include Go runtime and process metrics:
		// https://github.com/kubernetes/kubernetes/blob/9780d88cb6a4b5b067256ecb4abf56892093ee87/staging/src/k8s.io/component-base/metrics/legacyregistry/registry.goL46-L49
		legacyregistry.DefaultGatherer,
		// For CSI operations.
		metricsManager.GetRegistry(),
	}

	pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout)
	if err != nil {
		klog.Fatalf("Error getting CSI driver capabilities: %s", err)
	}

	// Generate a unique ID for this provisioner
	timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
	identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName
	if *enableNodeDeployment {
		identity = identity + "-" + node
	}

	factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
	var factoryForNamespace informers.SharedInformerFactory // usually nil, only used for CSIStorageCapacity

	// -------------------------------
	// Listers
	// Create informer to prevent hit the API server for all resource request
	scLister := factory.Storage().V1().StorageClasses().Lister()
	claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()

	var vaLister storagelistersv1.VolumeAttachmentLister
	if controllerCapabilities[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME] {
		klog.Info("CSI driver supports PUBLISH_UNPUBLISH_VOLUME, watching VolumeAttachments")
		vaLister = factory.Storage().V1().VolumeAttachments().Lister()
	} else {
		klog.Info("CSI driver does not support PUBLISH_UNPUBLISH_VOLUME, not watching VolumeAttachments")
	}

	var nodeDeployment *ctrl.NodeDeployment
	if *enableNodeDeployment {
		nodeDeployment = &ctrl.NodeDeployment{
			NodeName:         node,
			ClaimInformer:    factory.Core().V1().PersistentVolumeClaims(),
			ImmediateBinding: *nodeDeploymentImmediateBinding,
			BaseDelay:        *nodeDeploymentBaseDelay,
			MaxDelay:         *nodeDeploymentMaxDelay,
		}
		nodeInfo, err := ctrl.GetNodeInfo(grpcClient, *operationTimeout)
		if err != nil {
			klog.Fatalf("Failed to get node info from CSI driver: %v", err)
		}
		nodeDeployment.NodeInfo = *nodeInfo
	}

	var nodeLister listersv1.NodeLister
	var csiNodeLister storagelistersv1.CSINodeLister
	if ctrl.SupportsTopology(pluginCapabilities) {
		if nodeDeployment != nil {
			// Avoid watching in favor of fake, static objects. This is particularly relevant for
			// Node objects, which can generate significant traffic.
			csiNode := &storagev1.CSINode{
				ObjectMeta: metav1.ObjectMeta{
					Name: nodeDeployment.NodeName,
				},
				Spec: storagev1.CSINodeSpec{
					Drivers: []storagev1.CSINodeDriver{
						{
							Name:   provisionerName,
							NodeID: nodeDeployment.NodeInfo.NodeId,
						},
					},
				},
			}
			node := &v1.Node{
				ObjectMeta: metav1.ObjectMeta{
					Name: nodeDeployment.NodeName,
				},
			}
			if nodeDeployment.NodeInfo.AccessibleTopology != nil {
				for key := range nodeDeployment.NodeInfo.AccessibleTopology.Segments {
					csiNode.Spec.Drivers[0].TopologyKeys = append(csiNode.Spec.Drivers[0].TopologyKeys, key)
				}
				node.Labels = nodeDeployment.NodeInfo.AccessibleTopology.Segments
			}
			klog.Infof("using local topology with Node = %+v and CSINode = %+v", node, csiNode)

			// We make those fake objects available to the topology code via informers which
			// never change.
			stoppedFactory := informers.NewSharedInformerFactory(clientset, 1000*time.Hour)
			csiNodes := stoppedFactory.Storage().V1().CSINodes()
			nodes := stoppedFactory.Core().V1().Nodes()
			csiNodes.Informer().GetStore().Add(csiNode)
			nodes.Informer().GetStore().Add(node)
			csiNodeLister = csiNodes.Lister()
			nodeLister = nodes.Lister()

		} else {
			csiNodeLister = factory.Storage().V1().CSINodes().Lister()
			nodeLister = factory.Core().V1().Nodes().Lister()
		}
	}

	var referenceGrantLister referenceGrantv1beta1.ReferenceGrantLister
	var gatewayFactory gatewayInformers.SharedInformerFactory
	if utilfeature.DefaultFeatureGate.Enabled(features.CrossNamespaceVolumeDataSource) {
		gatewayFactory = gatewayInformers.NewSharedInformerFactory(gatewayClient, ctrl.ResyncPeriodOfReferenceGrantInformer)
		referenceGrants := gatewayFactory.Gateway().V1beta1().ReferenceGrants()
		referenceGrantLister = referenceGrants.Lister()
	}

	// -------------------------------
	// PersistentVolumeClaims informer
	rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)
	claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
	claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()

	// Setup options
	provisionerOptions := []func(*controller.ProvisionController) error{
		controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead.
		controller.FailedProvisionThreshold(0),
		controller.FailedDeleteThreshold(0),
		controller.RateLimiter(rateLimiter),
		controller.Threadiness(int(*workerThreads)),
		controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()),
		controller.ClaimsInformer(claimInformer),
		controller.NodesLister(nodeLister),
	}

	if utilfeature.DefaultFeatureGate.Enabled(features.HonorPVReclaimPolicy) {
		provisionerOptions = append(provisionerOptions, controller.AddFinalizer(true))
	}

	if supportsMigrationFromInTreePluginName != "" {
		provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
	}

	// Create the provisioner: it implements the Provisioner interface expected by
	// the controller
	csiProvisioner := ctrl.NewCSIProvisioner(
		clientset,
		*operationTimeout,
		identity,
		*volumeNamePrefix,
		*volumeNameUUIDLength,
		grpcClient,
		snapClient,
		provisionerName,
		pluginCapabilities,
		controllerCapabilities,
		supportsMigrationFromInTreePluginName,
		*strictTopology,
		*immediateTopology,
		translator,
		scLister,
		csiNodeLister,
		nodeLister,
		claimLister,
		vaLister,
		referenceGrantLister,
		*extraCreateMetadata,
		*defaultFSType,
		nodeDeployment,
		*controllerPublishReadOnly,
		*preventVolumeModeConversion,
	)

	var capacityController *capacity.Controller
	if *enableCapacity {
		// Publishing storage capacity information uses its own client
		// with separate rate limiting.
		config.QPS = *kubeAPICapacityQPS
		config.Burst = *kubeAPICapacityBurst
		clientset, err := kubernetes.NewForConfig(config)
		if err != nil {
			klog.Fatalf("Failed to create client: %v", err)
		}

		namespace := os.Getenv("NAMESPACE")
		if namespace == "" {
			klog.Fatal("need NAMESPACE env variable for CSIStorageCapacity objects")
		}
		var controller *metav1.OwnerReference
		if *capacityOwnerrefLevel >= 0 {
			podName := os.Getenv("POD_NAME")
			if podName == "" {
				klog.Fatal("need POD_NAME env variable to determine CSIStorageCapacity owner")
			}
			var err error
			controller, err = owner.Lookup(config, namespace, podName,
				schema.GroupVersionKind{
					Group:   "",
					Version: "v1",
					Kind:    "Pod",
				}, *capacityOwnerrefLevel)
			if err != nil {
				klog.Fatalf("look up owner(s) of pod: %v", err)
			}
			klog.Infof("using %s/%s %s as owner of CSIStorageCapacity objects", controller.APIVersion, controller.Kind, controller.Name)
		}

		var topologyInformer topology.Informer
		if nodeDeployment == nil {
			topologyInformer = topology.NewNodeTopology(
				provisionerName,
				clientset,
				factory.Core().V1().Nodes(),
				factory.Storage().V1().CSINodes(),
				workqueue.NewNamedRateLimitingQueue(rateLimiter, "csitopology"),
			)
		} else {
			var segment topology.Segment
			if nodeDeployment.NodeInfo.AccessibleTopology != nil {
				for key, value := range nodeDeployment.NodeInfo.AccessibleTopology.Segments {
					segment = append(segment, topology.SegmentEntry{Key: key, Value: value})
				}
			}
			klog.Infof("producing CSIStorageCapacity objects with fixed topology segment %s", segment)
			topologyInformer = topology.NewFixedNodeTopology(&segment)
		}
		go topologyInformer.RunWorker(ctx)

		managedByID := "external-provisioner"
		if *enableNodeDeployment {
			managedByID = getNameWithMaxLength(managedByID, node, validation.DNS1035LabelMaxLength)
		}

		// We only need objects from our own namespace. The normal factory would give
		// us an informer for the entire cluster. We can further restrict the
		// watch to just those objects with the right labels.
		factoryForNamespace = informers.NewSharedInformerFactoryWithOptions(clientset,
			ctrl.ResyncPeriodOfCsiNodeInformer,
			informers.WithNamespace(namespace),
			informers.WithTweakListOptions(func(lo *metav1.ListOptions) {
				lo.LabelSelector = labels.Set{
					capacity.DriverNameLabel: provisionerName,
					capacity.ManagedByLabel:  managedByID,
				}.AsSelector().String()
			}),
		)

		// We use the V1 CSIStorageCapacity API if available.
		clientFactory := capacity.NewV1ClientFactory(clientset)
		cInformer := factoryForNamespace.Storage().V1().CSIStorageCapacities()

		// This invalid object is used in a v1 Create call to determine
		// based on the resulting error whether the v1 API is supported.
		invalidCapacity := &storagev1.CSIStorageCapacity{
			ObjectMeta: metav1.ObjectMeta{
				Name: "%123-invalid-name",
			},
		}
		createdCapacity, err := clientset.StorageV1().CSIStorageCapacities(namespace).Create(ctx, invalidCapacity, metav1.CreateOptions{})
		switch {
		case err == nil:
			klog.Fatalf("creating an invalid v1.CSIStorageCapacity didn't fail as expected, got: %s", createdCapacity)
		case apierrors.IsNotFound(err):
			// We need to bridge between the v1beta1 API on the
			// server and the v1 API expected by the capacity code.
			klog.Info("using the CSIStorageCapacity v1beta1 API")
			clientFactory = capacity.NewV1beta1ClientFactory(clientset)
			cInformer = capacity.NewV1beta1InformerBridge(factoryForNamespace.Storage().V1beta1().CSIStorageCapacities())
		case apierrors.IsInvalid(err):
			klog.Info("using the CSIStorageCapacity v1 API")
		default:
			klog.Fatalf("unexpected error when checking for the V1 CSIStorageCapacity API: %v", err)
		}

		capacityController = capacity.NewCentralCapacityController(
			csi.NewControllerClient(grpcClient),
			provisionerName,
			clientFactory,
			// Metrics for the queue is available in the default registry.
			workqueue.NewNamedRateLimitingQueue(rateLimiter, "csistoragecapacity"),
			controller,
			managedByID,
			namespace,
			topologyInformer,
			factory.Storage().V1().StorageClasses(),
			cInformer,
			*capacityPollInterval,
			*capacityImmediateBinding,
			*operationTimeout,
		)
		legacyregistry.CustomMustRegister(capacityController)

		// Wrap Provision and Delete to detect when it is time to refresh capacity.
		csiProvisioner = capacity.NewProvisionWrapper(csiProvisioner, capacityController)
	}

	if addr != "" {
		// Start HTTP server, regardless whether we are the leader or not.
		// Register provisioner metrics manually to be able to add multiplexer in front of it
		m := libmetrics.New("controller")
		reg := prometheus.NewRegistry()
		reg.MustRegister([]prometheus.Collector{
			m.PersistentVolumeClaimProvisionTotal,
			m.PersistentVolumeClaimProvisionFailedTotal,
			m.PersistentVolumeClaimProvisionDurationSeconds,
			m.PersistentVolumeDeleteTotal,
			m.PersistentVolumeDeleteFailedTotal,
			m.PersistentVolumeDeleteDurationSeconds,
		}...)
		provisionerOptions = append(provisionerOptions, controller.MetricsInstance(m))
		gatherers = append(gatherers, reg)

		// This is similar to k8s.io/component-base/metrics HandlerWithReset
		// except that we gather from multiple sources. This is necessary
		// because both CSI metrics manager and component-base manage
		// their own registry. Probably could be avoided by making
		// CSI metrics manager a bit more flexible.
		mux.Handle(*metricsPath,
			promhttp.InstrumentMetricHandler(
				reg,
				promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{})))

		if *enableProfile {
			klog.InfoS("Starting profiling", "endpoint", httpEndpoint)

			mux.HandleFunc("/debug/pprof/", pprof.Index)
			mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
			mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
			mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
			mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
		}
		go func() {
			klog.Infof("ServeMux listening at %q", addr)
			err := http.ListenAndServe(addr, mux)
			if err != nil {
				klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", addr, *metricsPath, err)
			}
		}()
	}

	logger := klog.FromContext(ctx)
	provisionController = controller.NewProvisionController(
		logger,
		clientset,
		provisionerName,
		csiProvisioner,
		provisionerOptions...,
	)

	csiClaimController := ctrl.NewCloningProtectionController(
		clientset,
		claimLister,
		claimInformer,
		claimQueue,
		controllerCapabilities,
	)

	run := func(ctx context.Context) {
		factory.Start(ctx.Done())
		if factoryForNamespace != nil {
			// Starting is enough, the capacity controller will
			// wait for sync.
			factoryForNamespace.Start(ctx.Done())
		}
		cacheSyncResult := factory.WaitForCacheSync(ctx.Done())
		for _, v := range cacheSyncResult {
			if !v {
				klog.Fatalf("Failed to sync Informers!")
			}
		}

		if utilfeature.DefaultFeatureGate.Enabled(features.CrossNamespaceVolumeDataSource) {
			if gatewayFactory != nil {
				gatewayFactory.Start(ctx.Done())
			}
			gatewayCacheSyncResult := gatewayFactory.WaitForCacheSync(ctx.Done())
			for _, v := range gatewayCacheSyncResult {
				if !v {
					klog.Fatalf("Failed to sync Informers for gateway!")
				}
			}
		}

		if capacityController != nil {
			go capacityController.Run(ctx, int(*capacityThreads))
		}
		if csiClaimController != nil {
			go csiClaimController.Run(ctx, int(*finalizerThreads))
		}
		provisionController.Run(ctx)
	}

	if !*enableLeaderElection {
		run(ctx)
	} else {
		// this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/controller
		// to preserve backwards compatibility
		lockName := strings.Replace(provisionerName, "/", "-", -1)

		// create a new clientset for leader election
		leClientset, err := kubernetes.NewForConfig(config)
		if err != nil {
			klog.Fatalf("Failed to create leaderelection client: %v", err)
		}

		le := leaderelection.NewLeaderElection(leClientset, lockName, run)
		if *httpEndpoint != "" {
			le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout)
		}

		if *leaderElectionNamespace != "" {
			le.WithNamespace(*leaderElectionNamespace)
		}

		le.WithLeaseDuration(*leaderElectionLeaseDuration)
		le.WithRenewDeadline(*leaderElectionRenewDeadline)
		le.WithRetryPeriod(*leaderElectionRetryPeriod)
		le.WithIdentity(identity)

		if err := le.Run(); err != nil {
			klog.Fatalf("failed to initialize leader election: %v", err)
		}
	}
}

NewProvisionController()

  1. 获取主机名和生成唯一ID
    • 使用os.Hostname()获取当前主机名,如果获取失败,则记录错误日志并退出程序。
    • 将主机名与一个UUID结合,生成一个唯一的ID,以避免在同一主机上运行的多个进程发生冲突。
  2. 初始化事件记录器
    • 使用record.NewBroadcaster()创建一个事件广播器,并配置其进行结构化日志记录和事件记录。
    • 创建一个eventRecorder,用于记录事件。
  3. 创建并初始化ProvisionController实例
    • 初始化ProvisionController结构体,包括客户端、供应器名称、供应器实现、ID、组件名、事件记录器等字段。
    • 设置一系列默认值,如重同步周期、错误时的指数退避策略、线程数、失败阈值等。
    • 初始化指标相关配置。
  4. 处理选项函数
    • 遍历传入的选项函数列表,对每个函数进行调用,以配置ProvisionController实例。如果某个选项函数执行失败,则记录错误日志并退出程序。
  5. 初始化速率限制器和工作队列
    • 根据配置创建速率限制器,并用于初始化claimQueuevolumeQueue两个工作队列。
  6. 初始化Informer和事件处理器
    • 使用informers.NewSharedInformerFactory创建共享Informer工厂。
    • 为PersistentVolumeClaims(PVCs)、PersistentVolumes(PVs)和StorageClasses分别设置事件处理器和Indexer。
    • Informer用于监听Kubernetes资源的变化,并根据变化触发相应的事件处理函数。
  7. 初始化VolumeStore
    • 根据配置选择使用NewVolumeStoreQueueNewBackoffStore来初始化volumeStore,用于处理PV的创建和保存逻辑。
// NewProvisionController creates a new provision controller using
// the given configuration parameters and with private (non-shared) informers.
func NewProvisionController(
	logger klog.Logger,
	client kubernetes.Interface,
	provisionerName string,
	provisioner Provisioner,
	options ...func(*ProvisionController) error,
) *ProvisionController {
	id, err := os.Hostname()
	if err != nil {
		logger.Error(err, "Error getting hostname")
		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
	}
	// add a uniquifier so that two processes on the same host don't accidentally both become active
	id = id + "_" + string(uuid.NewUUID())
	component := provisionerName + "_" + id

	// TODO: Once the following PR is merged, change to use StartLogging and StartRecordingToSinkWithContext
	// https://github.com/kubernetes/kubernetes/pull/120729
	v1.AddToScheme(scheme.Scheme)
	broadcaster := record.NewBroadcaster()
	broadcaster.StartStructuredLogging(0)
	broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events(v1.NamespaceAll)})
	eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: component})

	controller := &ProvisionController{
		client:                    client,
		provisionerName:           provisionerName,
		provisioner:               provisioner,
		id:                        id,
		component:                 component,
		eventRecorder:             eventRecorder,
		resyncPeriod:              DefaultResyncPeriod,
		exponentialBackOffOnError: DefaultExponentialBackOffOnError,
		threadiness:               DefaultThreadiness,
		failedProvisionThreshold:  DefaultFailedProvisionThreshold,
		failedDeleteThreshold:     DefaultFailedDeleteThreshold,
		leaderElection:            DefaultLeaderElection,
		leaderElectionNamespace:   getInClusterNamespace(),
		leaseDuration:             DefaultLeaseDuration,
		renewDeadline:             DefaultRenewDeadline,
		retryPeriod:               DefaultRetryPeriod,
		metrics:                   metrics.New(controllerSubsystem),
		metricsPort:               DefaultMetricsPort,
		metricsAddress:            DefaultMetricsAddress,
		metricsPath:               DefaultMetricsPath,
		addFinalizer:              DefaultAddFinalizer,
		hasRun:                    false,
		hasRunLock:                &sync.Mutex{},
	}

	for _, option := range options {
		err := option(controller)
		if err != nil {
			logger.Error(err, "Error processing controller options")
			klog.FlushAndExit(klog.ExitFlushTimeout, 1)
		}
	}

	var rateLimiter workqueue.RateLimiter
	if controller.rateLimiter != nil {
		// rateLimiter set via parameter takes precedence
		rateLimiter = controller.rateLimiter
	} else if controller.exponentialBackOffOnError {
		rateLimiter = workqueue.NewMaxOfRateLimiter(
			workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second),
			&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
		)
	} else {
		rateLimiter = workqueue.NewMaxOfRateLimiter(
			workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 15*time.Second),
			&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
		)
	}
	controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
	controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes")

	informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod)

	// ----------------------
	// PersistentVolumeClaims

	claimHandler := cache.ResourceEventHandlerFuncs{
		AddFunc:    func(obj interface{}) { controller.enqueueClaim(obj) },
		UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueClaim(newObj) },
		DeleteFunc: func(obj interface{}) {
			// NOOP. The claim is either in claimsInProgress and in the queue, so it will be processed as usual
			// or it's not in claimsInProgress and then we don't care
		},
	}

	if controller.claimInformer != nil {
		controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)
	} else {
		controller.claimInformer = informer.Core().V1().PersistentVolumeClaims().Informer()
		controller.claimInformer.AddEventHandler(claimHandler)
	}
	err = controller.claimInformer.AddIndexers(cache.Indexers{uidIndex: func(obj interface{}) ([]string, error) {
		uid, err := getObjectUID(obj)
		if err != nil {
			return nil, err
		}
		return []string{uid}, nil
	}})
	if err != nil {
		logger.Error(err, "Error setting indexer for pvc informer", "indexer", uidIndex)
		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
	}
	controller.claimsIndexer = controller.claimInformer.GetIndexer()

	// -----------------
	// PersistentVolumes

	volumeHandler := cache.ResourceEventHandlerFuncs{
		AddFunc:    func(obj interface{}) { controller.enqueueVolume(obj) },
		UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueVolume(newObj) },
		DeleteFunc: func(obj interface{}) { controller.forgetVolume(obj) },
	}

	if controller.volumeInformer != nil {
		controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)
	} else {
		controller.volumeInformer = informer.Core().V1().PersistentVolumes().Informer()
		controller.volumeInformer.AddEventHandler(volumeHandler)
	}
	controller.volumes = controller.volumeInformer.GetStore()

	// --------------
	// StorageClasses

	// no resource event handler needed for StorageClasses
	if controller.classInformer == nil {
		controller.classInformer = informer.Storage().V1().StorageClasses().Informer()
	}
	controller.classes = controller.classInformer.GetStore()

	if controller.createProvisionerPVLimiter != nil {
		logger.V(2).Info("Using saving PVs to API server in background")
		controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter, controller.claimsIndexer, controller.eventRecorder)
	} else {
		if controller.createProvisionedPVBackoff == nil {
			// Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default.
			if controller.createProvisionedPVInterval == 0 {
				controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval
			}
			if controller.createProvisionedPVRetryCount == 0 {
				controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount
			}
			controller.createProvisionedPVBackoff = &wait.Backoff{
				Duration: controller.createProvisionedPVInterval,
				Factor:   1, // linear backoff
				Steps:    controller.createProvisionedPVRetryCount,
				// Cap:      controller.createProvisionedPVInterval,
			}
		}
		logger.V(2).Info("Using blocking saving PVs to API server")
		controller.volumeStore = NewBackoffStore(client, controller.eventRecorder, controller.createProvisionedPVBackoff, controller)
	}

	return controller
}

syncClaim()

  1. 判断是否应该进行供给:
    • 调用ctrl.shouldProvision(ctx, claim)方法来判断是否需要对这个PVC进行供给操作。如果返回错误,则更新供给统计信息并返回错误。
    • 如果shouldtrue,表示需要进行供给操作。
  2. 供给操作:
    • 记录供给操作的开始时间。
    • 从上下文中获取logger对象。
    • 调用ctrl.provisionClaimOperation(ctx, claim)方法进行供给操作,返回操作状态和可能的错误。
    • 更新供给统计信息,传入错误和开始时间。
  3. 处理供给操作的结果:
    • 如果供给操作没有错误或者状态是ProvisioningFinished,表示供给操作已经完成或者不需要进行。根据错误类型进行不同的处理:
      • 如果没有错误,记录日志并删除该PVC在claimsInProgress中的记录。
      • 如果错误是errStopProvision,记录日志并将错误置为nil(调用者会重新排队处理)。
      • 其他错误类型,记录日志。
    • 如果供给状态是ProvisioningInBackground,表示供给操作正在后台进行,记录日志并将PVC添加到claimsInProgress中。
    • 如果供给状态是ProvisioningNoChange,不做任何修改,保持claimsInProgress的状态不变。
  4. 返回错误:
    • 如果不需要进行供给操作或者供给操作已经完成并且没有需要处理的错误,则返回nil
    • 否则,返回供给操作中的错误。
      这段代码的主要逻辑是围绕PVC的供给状态进行操作,根据供给的结果更新内部状态(如claimsInProgress),并记录相关的日志信息。通过这种方式,ProvisionController能够管理多个PVC的供给过程,确保每个PVC都能够被正确地处理。
func (ctrl *ProvisionController) syncClaim(ctx context.Context, obj interface{}) error {
	claim, ok := obj.(*v1.PersistentVolumeClaim)
	if !ok {
		return fmt.Errorf("expected claim but got %+v", obj)
	}

	should, err := ctrl.shouldProvision(ctx, claim)
	if err != nil {
		ctrl.updateProvisionStats(claim, err, time.Time{})
		return err
	} else if should {
		startTime := time.Now()
		logger := klog.FromContext(ctx)

		status, err := ctrl.provisionClaimOperation(ctx, claim)
		ctrl.updateProvisionStats(claim, err, startTime)
		if err == nil || status == ProvisioningFinished {
			// Provisioning is 100% finished / not in progress.
			switch err {
			case nil:
				logger.V(5).Info("Claim processing succeeded, removing PVC from claims in progress", "claimUID", claim.UID)
			case errStopProvision:
				logger.V(5).Info("Stop provisioning, removing PVC from claims in progress", "claimUID", claim.UID)
				// Our caller would requeue if we pass on this special error; return nil instead.
				err = nil
			default:
				logger.V(2).Info("Final error received, removing PVC from claims in progress", "claimUID", claim.UID)
			}
			ctrl.claimsInProgress.Delete(string(claim.UID))
			return err
		}
		if status == ProvisioningInBackground {
			// Provisioning is in progress in background.
			logger.V(2).Info("Temporary error received, adding PVC to claims in progress", "claimUID", claim.UID)
			ctrl.claimsInProgress.Store(string(claim.UID), claim)
		} else {
			// status == ProvisioningNoChange.
			// Don't change claimsInProgress:
			// - the claim is already there if previous status was ProvisioningInBackground.
			// - the claim is not there if if previous status was ProvisioningFinished.
		}
		return err
	}
	return nil
}
shouldProvision()
  1. 检查PVC是否已指定卷名
    • 如果claim.Spec.VolumeName不为空,表示这个PVC已经绑定到了一个具体的卷上,因此不需要再进行供给。方法返回false, nil
  2. 检查Provisioner是否实现了Qualifier接口
    • 通过类型断言ctrl.provisioner.(Qualifier)检查ctrl.provisioner是否实现了Qualifier接口。
    • 如果实现了,并且Qualifier接口的ShouldProvision方法返回false,则表示不需要进行供给。方法返回false, nil
  3. 检查PVC的注解以确定Provisioner
    • 首先尝试从PVC的注解中获取annStorageProvisioner的值。
    • 如果不存在,则尝试获取annBetaStorageProvisioner的值。
    • 这两个注解用于指定负责供给卷的Provisioner。
  4. 检查找到的Provisioner是否是已知的
    • 如果找到了Provisioner的注解,并且这个Provisioner是控制器已知的(通过ctrl.knownProvisioner(provisioner)检查),则继续下一步。
  5. 检查StorageClass的VolumeBindingMode
    • 通过util.GetPersistentVolumeClaimClass(claim)获取PVC所属的StorageClass。
    • 通过ctrl.getStorageClass(claimClass)获取这个StorageClass的详细信息。
    • 检查StorageClass的
      检查StorageClass的VolumeBindingMode。如果设置为storage.VolumeBindingWaitForFirstConsumer(即延迟绑定模式),则需要进一步检查PVC的注解中是否有annSelectedNode
      • 如果有annSelectedNode且其值不为空,表示已经选定了节点,可以进行供给。方法返回true, nil
      • 如果没有或值为空,则不进行供给。方法返回false, nil
  6. 默认进行供给
    • 如果StorageClass的VolumeBindingMode不是延迟绑定模式,或者没有找到VolumeBindingMode,则默认需要进行供给。方法返回true, nil
  7. 未找到Provisioner
    • 如果在PVC的注解中没有找到任何Provisioner的标识,则不进行供给。方法返回false, nil

总结来说,这段代码通过检查PVC的各种属性和注解,以及关联的StorageClass的配置,来决定是否需要对这个PVC进行卷的供给。这涉及到检查是否已经指定了卷、是否满足特定的供给条件、是否使用了延迟绑定模式等多个方面

// shouldProvision returns whether a claim should have a volume provisioned for
// it, i.e. whether a Provision is "desired"
func (ctrl *ProvisionController) shouldProvision(ctx context.Context, claim *v1.PersistentVolumeClaim) (bool, error) {
    if claim.Spec.VolumeName != "" {
        return false, nil
    }
    if qualifier, ok := ctrl.provisioner.(Qualifier); ok {
        if !qualifier.ShouldProvision(ctx, claim) {
            return false, nil
        }
    }
    provisioner, found := claim.Annotations[annStorageProvisioner]
    if !found {
        provisioner, found = claim.Annotations[annBetaStorageProvisioner]
    }
    if found {
        if ctrl.knownProvisioner(provisioner) {
            claimClass := util.GetPersistentVolumeClaimClass(claim)
            class, err := ctrl.getStorageClass(claimClass)
            if err != nil {
                return false, err
            }
            if class.VolumeBindingMode != nil && *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {
                // When claim is in delay binding mode, annSelectedNode is
                // required to provision volume.
                // Though PV controller set annStorageProvisioner only when
                // annSelectedNode is set, but provisioner may remove
                // annSelectedNode to notify scheduler to reschedule again.
                if selectedNode, ok := claim.Annotations[annSelectedNode]; ok && selectedNode != "" {
                    return true, nil
                }
                return false, nil
            }
            return true, nil
        }
    }
    return false, nil
}
provisionClaimOperation()
  1. 获取PVC的类别
    • 使用util.GetPersistentVolumeClaimClass(claim)获取PVC的存储类别(StorageClass)。
  2. 日志记录
    • 使用Kubernetes的日志库klog来记录日志,包括PVC和StorageClass的信息。
  3. 检查PV是否已经存在
    • 通过ctrl.getProvisionedVolumeNameForClaim(claim)获取预期的PV名称,然后检查这个PV是否已经在ctrl.volumes中存在。如果存在,说明PV已经被分配,函数返回ProvisioningFinishederrStopProvision
  4. 获取PVC的引用
    • 使用ref.GetReference(scheme.Scheme, claim)获取PVC的引用,以便在后续操作中引用这个PVC对象。
  5. 检查是否可以分配
    • 调用ctrl.canProvision(ctx, claim)检查当前的ProvisionController是否可以处理这个PVC的分配请求。如果不能,记录事件并返回错误。
  6. 获取StorageClass信息
    • 通过ctrl.getStorageClass(claimClass)获取PVC指定的StorageClass的信息。如果获取失败或StorageClass的Provisioner不被当前ProvisionController支持,则记录错误并返回。
  7. 获取选定的节点
    • 如果PVC的注解中指定了选定的节点(annSelectedNodeannAlphaSelectedNode),则尝试获取这个节点的信息。如果节点不存在,调用ctrl.provisionVolumeErrorHandling处理错误。
  8. 准备分配选项
    • 创建一个ProvisionOptions对象,包含StorageClass、PV名称、PVC对象和选定的节点信息。
  9. 记录正常事件
    • 使用ctrl.eventRecorder.Event记录一个正常事件,表示外部Provisioner正在为PVC分配存储卷。
  10. 调用Provisioner进行分配
    • 调用ctrl.provisioner.Provision(ctx, options)尝试分配存储卷。如果分配失败,根据错误类型进行相应的错误处理。
  11. 设置PVC的引用和Finalizer
    • 如果分配成功,设置PV的ClaimRef为PVC的引用,并根据需要添加Finalizer。
  12. 更新PV的元数据和存储类别
    • 更新PV的注解和存储类别信息。
  13. 存储和添加PV
    • 使用ctrl.volumeStore.StoreVolume存储PV信息,并将PV添加到ctrl.volumes中。
  14. 返回结果
    • 如果所有操作都成功,函数返回ProvisioningFinishednil表示成功完成分配。

这个函数涵盖了从检查PV是否存在到实际分配存储卷,再到更新内部状态和记录相关事件的整个过程。它是Kubernetes存储卷分配流程中的一个关键部分,确保了PVC能够被正确地处理和分配存储资源。

func (ctrl *ProvisionController) provisionClaimOperation(ctx context.Context, claim *v1.PersistentVolumeClaim) (ProvisioningState, error) {
	// Most code here is identical to that found in controller.go of kube's PV controller...
	claimClass := util.GetPersistentVolumeClaimClass(claim)
	logger := klog.LoggerWithValues(klog.FromContext(ctx), "PVC", klog.KObj(claim), "StorageClass", claimClass)
	logger.V(4).Info("Started")

	//  A previous doProvisionClaim may just have finished while we were waiting for
	//  the locks. Check that PV (with deterministic name) hasn't been provisioned
	//  yet.
	pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
	_, exists, err := ctrl.volumes.GetByKey(pvName)
	if err == nil && exists {
		// Volume has been already provisioned, nothing to do.
		logger.V(4).Info("PersistentVolume already exists, skipping", "PV", pvName)
		return ProvisioningFinished, errStopProvision
	}

	// Prepare a claimRef to the claim early (to fail before a volume is
	// provisioned)
	claimRef, err := ref.GetReference(scheme.Scheme, claim)
	if err != nil {
		logger.Error(err, "Unexpected error getting claim reference")
		return ProvisioningNoChange, err
	}

	// Check if this provisioner can provision this claim.
	if err = ctrl.canProvision(ctx, claim); err != nil {
		ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
		logger.Error(err, "Failed to provision volume")
		return ProvisioningFinished, errStopProvision
	}

	// For any issues getting fields from StorageClass (including reclaimPolicy & mountOptions),
	// retry the claim because the storageClass can be fixed/(re)created independently of the claim
	class, err := ctrl.getStorageClass(claimClass)
	if err != nil {
		logger.Error(err, "Error getting claim's StorageClass's fields")
		return ProvisioningFinished, err
	}
	if !ctrl.knownProvisioner(class.Provisioner) {
		// class.Provisioner has either changed since shouldProvision() or
		// annDynamicallyProvisioned contains different provisioner than
		// class.Provisioner.
		logger.Error(nil, "Unknown provisioner requested in claim's StorageClass", "provisioner", class.Provisioner)
		return ProvisioningFinished, errStopProvision
	}

	var selectedNode *v1.Node
	// Get SelectedNode
	if nodeName, ok := getString(claim.Annotations, annSelectedNode, annAlphaSelectedNode); ok {
		if ctrl.nodeLister != nil {
			selectedNode, err = ctrl.nodeLister.Get(nodeName)
		} else {
			selectedNode, err = ctrl.client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) // TODO (verult) cache Nodes
		}
		if err != nil {
			// if node does not exist, reschedule and remove volume.kubernetes.io/selected-node annotation
			if apierrs.IsNotFound(err) {
				ctx2 := klog.NewContext(ctx, logger)
				return ctrl.provisionVolumeErrorHandling(ctx2, ProvisioningReschedule, err, claim)
			}
			err = fmt.Errorf("failed to get target node: %v", err)
			ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
			return ProvisioningNoChange, err
		}
	}

	options := ProvisionOptions{
		StorageClass: class,
		PVName:       pvName,
		PVC:          claim,
		SelectedNode: selectedNode,
	}

	ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", klog.KObj(claim)))

	volume, result, err := ctrl.provisioner.Provision(ctx, options)
	if err != nil {
		if ierr, ok := err.(*IgnoredError); ok {
			// Provision ignored, do nothing and hope another provisioner will provision it.
			logger.V(4).Info("Volume provision ignored", "reason", ierr)
			return ProvisioningFinished, errStopProvision
		}

		ctx2 := klog.NewContext(ctx, logger)
		err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err)
		return ctrl.provisionVolumeErrorHandling(ctx2, result, err, claim)
	}

	logger.V(4).Info("Volume is provisioned", "PV", volume.Name)

	// Set ClaimRef and the PV controller will bind and set annBoundByController for us
	volume.Spec.ClaimRef = claimRef

	// Add external provisioner finalizer if it doesn't already have it
	if ctrl.addFinalizer && !ctrl.checkFinalizer(volume, finalizerPV) {
		volume.ObjectMeta.Finalizers = append(volume.ObjectMeta.Finalizers, finalizerPV)
	}

	metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, class.Provisioner)
	volume.Spec.StorageClassName = claimClass

	logger.V(4).Info("Succeeded")

	if err := ctrl.volumeStore.StoreVolume(logger, claim, volume); err != nil {
		return ProvisioningFinished, err
	}
	if err = ctrl.volumes.Add(volume); err != nil {
		utilruntime.HandleError(err)
	}
	return ProvisioningFinished, nil
}

相关文章:

  • 从零开始学A2A一:A2A 协议概述与核心概念
  • 人工智能驱动的科研新范式及学科应用研究
  • 【Java学习笔记】Java初级阶段代码规范
  • 基于项目管理的轻量级目标检测自动标注系统【基于 YOLOV8】
  • 打造可控可测的星座网络:IPLOOK低轨通信仿真平台搭建实践
  • 小葱桌面电视版下载_小葱桌面app免费下载最新版
  • 关于 软件开发模型 的分类、核心特点及详细对比分析,涵盖传统模型、迭代模型、敏捷模型等主流类型
  • 2025年常见渗透测试面试题-红队面试宝典下(题目+回答)
  • 谷歌浏览器的开发者模式如何开启及安装教程
  • 关于 驱动开发方法 的详细分类、核心特点及对比分析,涵盖 TDD、MDD、BDD、DDD、ATDD、FDD、PDD 等主流方法
  • MySQL 的 `binlog_format` 是做什么的?
  • [Python基础速成]2-模块与包与OOP
  • 【MySQL】表的增删改查
  • 【HarmonyOS NEXT】多目标产物构建实践
  • Android Studio 中文字大小的单位详解
  • LeetCode 118题解 | 杨辉三角
  • 【Maniskill】训练使用的性能指标
  • 【5G学习】基本概念之多频资源以及子载波和信道
  • RabbitMQ 优先级队列详解
  • 自适应调度器:动态分配测试资源
  • 杨荫凯履新浙江省委常委、组织部部长,曾任中央财办副主任
  • 韩冬任国家广播电视总局副局长,齐竹泉任中央广播电视总台副台长
  • 世卫发布预防少女怀孕新指南,呼吁终止童婚、延长女孩受教育时间
  • 特朗普激发加拿大爱国热情之下:大选提前投票人数创纪录,魁北克分离情绪被冲淡
  • 美国那点事|特朗普的“刀”砍向国务院,美国霸权迎来历史拐点?
  • “动漫短剧”值不值得做?