CSI-external-provisioner
main()
这段Go代码是一个CSI(容器存储接口)Provisioner(供应器)的实现,用于在Kubernetes集群中动态提供持久卷。代码涉及多个组件和步骤,下面是对关键部分的解释:
- 初始化和配置
- 命令行标志和环境变量:代码使用
flag
包处理命令行参数,如feature-gates
、kubeconfig
等。同时,从环境变量中获取配置,如NODE_NAME
。 - 日志和度量:使用
klog
进行日志记录,并配置Prometheus度量收集器。
- Kubernetes客户端配置
- 构建KubeConfig:根据是否提供了
master
或kubeconfig
参数,决定是使用集群内配置(rest.InClusterConfig
)还是外部配置(clientcmd.BuildConfigFromFlags
)。 - 客户端创建:使用配置创建Kubernetes客户端(
kubernetes.NewForConfig
)和CSI快照客户端(snapclientset.NewForConfig
)。
- CSI驱动连接和验证
- 连接到CSI驱动:通过gRPC连接到CSI驱动,并进行基本的探测(
ctrl.Probe
)以确保驱动可用。 - 获取驱动名称和能力:从CSI驱动获取驱动名称(
ctrl.GetDriverName
)和能力(ctrl.GetDriverCapabilities
)。
- 拓扑和节点信息
- 拓扑支持:如果CSI驱动支持拓扑,则创建相应的informer来监视节点和CSINode对象。
- 节点部署:如果启用了节点部署(
--enable-node-deployment
),则获取节点信息并配置节点部署对象。
- Provisioner和Controller创建
- Provisioner创建:使用获取的配置和客户端创建CSI Provisioner对象,该对象实现了Provisioner接口。
- 容量控制器:如果启用了容量功能(
--enable-capacity
),则创建容量控制器来发布存储容量信息。
- HTTP服务器和度量
- HTTP服务器:如果指定了度量地址(
--metrics-address
)或HTTP端点(--http-endpoint
),则启动HTTP服务器来暴露度量和可能的调试端点(如pprof)。
- Informers和队列
- Informer和队列:创建各种资源的Informer来监视Kubernetes对象的变化,并使用工作队列处理事件。
- 运行
- 启动Informer和控制器:启动Informer工厂和控制器,开始监视和处理事件。
总结
这段代码是一个复杂的CSI Provisioner实现,它集成了Kubernetes客户端、CSI驱动、度量收集、拓扑感知、容量管理等多个组件。通过精心设计的架构和模块化的代码,它能够在Kubernetes集群中高效地提供和管理持久卷。
func main() {
var config *rest.Config
var err error
flag.Var(utilflag.NewMapStringBool(&featureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
"Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n"))
klog.InitFlags(nil)
flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
flag.Set("logtostderr", "true")
flag.Parse()
ctx := context.Background()
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil {
klog.Fatal(err)
}
node := os.Getenv("NODE_NAME")
if *enableNodeDeployment && node == "" {
klog.Fatal("The NODE_NAME environment variable must be set when using --enable-node-deployment.")
}
if *showVersion {
fmt.Println(os.Args[0], version)
os.Exit(0)
}
klog.Infof("Version: %s", version)
if *metricsAddress != "" && *httpEndpoint != "" {
klog.Error("only one of `--metrics-address` and `--http-endpoint` can be set.")
os.Exit(1)
}
addr := *metricsAddress
if addr == "" {
addr = *httpEndpoint
}
// get the KUBECONFIG from env if specified (useful for local/debug cluster)
kubeconfigEnv := os.Getenv("KUBECONFIG")
if kubeconfigEnv != "" {
klog.Infof("Found KUBECONFIG environment variable set, using that..")
kubeconfig = &kubeconfigEnv
}
if *master != "" || *kubeconfig != "" {
klog.Infof("Either master or kubeconfig specified. building kube config from that..")
config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
} else {
klog.Infof("Building kube configs for running in cluster...")
config, err = rest.InClusterConfig()
}
if err != nil {
klog.Fatalf("Failed to create config: %v", err)
}
config.QPS = *kubeAPIQPS
config.Burst = *kubeAPIBurst
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create client: %v", err)
}
// snapclientset.NewForConfig creates a new Clientset for VolumesnapshotV1Client
snapClient, err := snapclientset.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create snapshot client: %v", err)
}
var gatewayClient gatewayclientset.Interface
if utilfeature.DefaultFeatureGate.Enabled(features.CrossNamespaceVolumeDataSource) {
// gatewayclientset.NewForConfig creates a new Clientset for GatewayClient
gatewayClient, err = gatewayclientset.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create gateway client: %v", err)
}
}
metricsManager := metrics.NewCSIMetricsManagerWithOptions("", /* driverName */
// Will be provided via default gatherer.
metrics.WithProcessStartTime(false),
metrics.WithSubsystem(metrics.SubsystemSidecar),
)
grpcClient, err := ctrl.Connect(ctx, *csiEndpoint, metricsManager)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
err = ctrl.Probe(ctx, grpcClient, *operationTimeout)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
// Autodetect provisioner name
provisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout)
if err != nil {
klog.Fatalf("Error getting CSI driver name: %s", err)
}
klog.V(2).Infof("Detected CSI driver %s", provisionerName)
metricsManager.SetDriverName(provisionerName)
translator := csitrans.New()
supportsMigrationFromInTreePluginName := ""
if translator.IsMigratedCSIDriverByName(provisionerName) {
supportsMigrationFromInTreePluginName, err = translator.GetInTreeNameFromCSIName(provisionerName)
if err != nil {
klog.Fatalf("Failed to get InTree plugin name for migrated CSI plugin %s: %v", provisionerName, err)
}
klog.V(2).Infof("Supports migration from in-tree plugin: %s", supportsMigrationFromInTreePluginName)
// Create a new connection with the metrics manager with migrated label
metricsManager = metrics.NewCSIMetricsManagerWithOptions(provisionerName,
// Will be provided via default gatherer.
metrics.WithProcessStartTime(false),
metrics.WithMigration())
migratedGrpcClient, err := ctrl.Connect(ctx, *csiEndpoint, metricsManager)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
grpcClient.Close()
grpcClient = migratedGrpcClient
err = ctrl.Probe(ctx, grpcClient, *operationTimeout)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
}
// Prepare http endpoint for metrics + leader election healthz
mux := http.NewServeMux()
gatherers := prometheus.Gatherers{
// For workqueue and leader election metrics, set up via the anonymous imports of:
// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/workqueue/metrics.go
// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/leaderelection/metrics.go
//
// Also to happens to include Go runtime and process metrics:
// https://github.com/kubernetes/kubernetes/blob/9780d88cb6a4b5b067256ecb4abf56892093ee87/staging/src/k8s.io/component-base/metrics/legacyregistry/registry.goL46-L49
legacyregistry.DefaultGatherer,
// For CSI operations.
metricsManager.GetRegistry(),
}
pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout)
if err != nil {
klog.Fatalf("Error getting CSI driver capabilities: %s", err)
}
// Generate a unique ID for this provisioner
timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName
if *enableNodeDeployment {
identity = identity + "-" + node
}
factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
var factoryForNamespace informers.SharedInformerFactory // usually nil, only used for CSIStorageCapacity
// -------------------------------
// Listers
// Create informer to prevent hit the API server for all resource request
scLister := factory.Storage().V1().StorageClasses().Lister()
claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()
var vaLister storagelistersv1.VolumeAttachmentLister
if controllerCapabilities[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME] {
klog.Info("CSI driver supports PUBLISH_UNPUBLISH_VOLUME, watching VolumeAttachments")
vaLister = factory.Storage().V1().VolumeAttachments().Lister()
} else {
klog.Info("CSI driver does not support PUBLISH_UNPUBLISH_VOLUME, not watching VolumeAttachments")
}
var nodeDeployment *ctrl.NodeDeployment
if *enableNodeDeployment {
nodeDeployment = &ctrl.NodeDeployment{
NodeName: node,
ClaimInformer: factory.Core().V1().PersistentVolumeClaims(),
ImmediateBinding: *nodeDeploymentImmediateBinding,
BaseDelay: *nodeDeploymentBaseDelay,
MaxDelay: *nodeDeploymentMaxDelay,
}
nodeInfo, err := ctrl.GetNodeInfo(grpcClient, *operationTimeout)
if err != nil {
klog.Fatalf("Failed to get node info from CSI driver: %v", err)
}
nodeDeployment.NodeInfo = *nodeInfo
}
var nodeLister listersv1.NodeLister
var csiNodeLister storagelistersv1.CSINodeLister
if ctrl.SupportsTopology(pluginCapabilities) {
if nodeDeployment != nil {
// Avoid watching in favor of fake, static objects. This is particularly relevant for
// Node objects, which can generate significant traffic.
csiNode := &storagev1.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: nodeDeployment.NodeName,
},
Spec: storagev1.CSINodeSpec{
Drivers: []storagev1.CSINodeDriver{
{
Name: provisionerName,
NodeID: nodeDeployment.NodeInfo.NodeId,
},
},
},
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeDeployment.NodeName,
},
}
if nodeDeployment.NodeInfo.AccessibleTopology != nil {
for key := range nodeDeployment.NodeInfo.AccessibleTopology.Segments {
csiNode.Spec.Drivers[0].TopologyKeys = append(csiNode.Spec.Drivers[0].TopologyKeys, key)
}
node.Labels = nodeDeployment.NodeInfo.AccessibleTopology.Segments
}
klog.Infof("using local topology with Node = %+v and CSINode = %+v", node, csiNode)
// We make those fake objects available to the topology code via informers which
// never change.
stoppedFactory := informers.NewSharedInformerFactory(clientset, 1000*time.Hour)
csiNodes := stoppedFactory.Storage().V1().CSINodes()
nodes := stoppedFactory.Core().V1().Nodes()
csiNodes.Informer().GetStore().Add(csiNode)
nodes.Informer().GetStore().Add(node)
csiNodeLister = csiNodes.Lister()
nodeLister = nodes.Lister()
} else {
csiNodeLister = factory.Storage().V1().CSINodes().Lister()
nodeLister = factory.Core().V1().Nodes().Lister()
}
}
var referenceGrantLister referenceGrantv1beta1.ReferenceGrantLister
var gatewayFactory gatewayInformers.SharedInformerFactory
if utilfeature.DefaultFeatureGate.Enabled(features.CrossNamespaceVolumeDataSource) {
gatewayFactory = gatewayInformers.NewSharedInformerFactory(gatewayClient, ctrl.ResyncPeriodOfReferenceGrantInformer)
referenceGrants := gatewayFactory.Gateway().V1beta1().ReferenceGrants()
referenceGrantLister = referenceGrants.Lister()
}
// -------------------------------
// PersistentVolumeClaims informer
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)
claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()
// Setup options
provisionerOptions := []func(*controller.ProvisionController) error{
controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead.
controller.FailedProvisionThreshold(0),
controller.FailedDeleteThreshold(0),
controller.RateLimiter(rateLimiter),
controller.Threadiness(int(*workerThreads)),
controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()),
controller.ClaimsInformer(claimInformer),
controller.NodesLister(nodeLister),
}
if utilfeature.DefaultFeatureGate.Enabled(features.HonorPVReclaimPolicy) {
provisionerOptions = append(provisionerOptions, controller.AddFinalizer(true))
}
if supportsMigrationFromInTreePluginName != "" {
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
}
// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(
clientset,
*operationTimeout,
identity,
*volumeNamePrefix,
*volumeNameUUIDLength,
grpcClient,
snapClient,
provisionerName,
pluginCapabilities,
controllerCapabilities,
supportsMigrationFromInTreePluginName,
*strictTopology,
*immediateTopology,
translator,
scLister,
csiNodeLister,
nodeLister,
claimLister,
vaLister,
referenceGrantLister,
*extraCreateMetadata,
*defaultFSType,
nodeDeployment,
*controllerPublishReadOnly,
*preventVolumeModeConversion,
)
var capacityController *capacity.Controller
if *enableCapacity {
// Publishing storage capacity information uses its own client
// with separate rate limiting.
config.QPS = *kubeAPICapacityQPS
config.Burst = *kubeAPICapacityBurst
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create client: %v", err)
}
namespace := os.Getenv("NAMESPACE")
if namespace == "" {
klog.Fatal("need NAMESPACE env variable for CSIStorageCapacity objects")
}
var controller *metav1.OwnerReference
if *capacityOwnerrefLevel >= 0 {
podName := os.Getenv("POD_NAME")
if podName == "" {
klog.Fatal("need POD_NAME env variable to determine CSIStorageCapacity owner")
}
var err error
controller, err = owner.Lookup(config, namespace, podName,
schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Pod",
}, *capacityOwnerrefLevel)
if err != nil {
klog.Fatalf("look up owner(s) of pod: %v", err)
}
klog.Infof("using %s/%s %s as owner of CSIStorageCapacity objects", controller.APIVersion, controller.Kind, controller.Name)
}
var topologyInformer topology.Informer
if nodeDeployment == nil {
topologyInformer = topology.NewNodeTopology(
provisionerName,
clientset,
factory.Core().V1().Nodes(),
factory.Storage().V1().CSINodes(),
workqueue.NewNamedRateLimitingQueue(rateLimiter, "csitopology"),
)
} else {
var segment topology.Segment
if nodeDeployment.NodeInfo.AccessibleTopology != nil {
for key, value := range nodeDeployment.NodeInfo.AccessibleTopology.Segments {
segment = append(segment, topology.SegmentEntry{Key: key, Value: value})
}
}
klog.Infof("producing CSIStorageCapacity objects with fixed topology segment %s", segment)
topologyInformer = topology.NewFixedNodeTopology(&segment)
}
go topologyInformer.RunWorker(ctx)
managedByID := "external-provisioner"
if *enableNodeDeployment {
managedByID = getNameWithMaxLength(managedByID, node, validation.DNS1035LabelMaxLength)
}
// We only need objects from our own namespace. The normal factory would give
// us an informer for the entire cluster. We can further restrict the
// watch to just those objects with the right labels.
factoryForNamespace = informers.NewSharedInformerFactoryWithOptions(clientset,
ctrl.ResyncPeriodOfCsiNodeInformer,
informers.WithNamespace(namespace),
informers.WithTweakListOptions(func(lo *metav1.ListOptions) {
lo.LabelSelector = labels.Set{
capacity.DriverNameLabel: provisionerName,
capacity.ManagedByLabel: managedByID,
}.AsSelector().String()
}),
)
// We use the V1 CSIStorageCapacity API if available.
clientFactory := capacity.NewV1ClientFactory(clientset)
cInformer := factoryForNamespace.Storage().V1().CSIStorageCapacities()
// This invalid object is used in a v1 Create call to determine
// based on the resulting error whether the v1 API is supported.
invalidCapacity := &storagev1.CSIStorageCapacity{
ObjectMeta: metav1.ObjectMeta{
Name: "%123-invalid-name",
},
}
createdCapacity, err := clientset.StorageV1().CSIStorageCapacities(namespace).Create(ctx, invalidCapacity, metav1.CreateOptions{})
switch {
case err == nil:
klog.Fatalf("creating an invalid v1.CSIStorageCapacity didn't fail as expected, got: %s", createdCapacity)
case apierrors.IsNotFound(err):
// We need to bridge between the v1beta1 API on the
// server and the v1 API expected by the capacity code.
klog.Info("using the CSIStorageCapacity v1beta1 API")
clientFactory = capacity.NewV1beta1ClientFactory(clientset)
cInformer = capacity.NewV1beta1InformerBridge(factoryForNamespace.Storage().V1beta1().CSIStorageCapacities())
case apierrors.IsInvalid(err):
klog.Info("using the CSIStorageCapacity v1 API")
default:
klog.Fatalf("unexpected error when checking for the V1 CSIStorageCapacity API: %v", err)
}
capacityController = capacity.NewCentralCapacityController(
csi.NewControllerClient(grpcClient),
provisionerName,
clientFactory,
// Metrics for the queue is available in the default registry.
workqueue.NewNamedRateLimitingQueue(rateLimiter, "csistoragecapacity"),
controller,
managedByID,
namespace,
topologyInformer,
factory.Storage().V1().StorageClasses(),
cInformer,
*capacityPollInterval,
*capacityImmediateBinding,
*operationTimeout,
)
legacyregistry.CustomMustRegister(capacityController)
// Wrap Provision and Delete to detect when it is time to refresh capacity.
csiProvisioner = capacity.NewProvisionWrapper(csiProvisioner, capacityController)
}
if addr != "" {
// Start HTTP server, regardless whether we are the leader or not.
// Register provisioner metrics manually to be able to add multiplexer in front of it
m := libmetrics.New("controller")
reg := prometheus.NewRegistry()
reg.MustRegister([]prometheus.Collector{
m.PersistentVolumeClaimProvisionTotal,
m.PersistentVolumeClaimProvisionFailedTotal,
m.PersistentVolumeClaimProvisionDurationSeconds,
m.PersistentVolumeDeleteTotal,
m.PersistentVolumeDeleteFailedTotal,
m.PersistentVolumeDeleteDurationSeconds,
}...)
provisionerOptions = append(provisionerOptions, controller.MetricsInstance(m))
gatherers = append(gatherers, reg)
// This is similar to k8s.io/component-base/metrics HandlerWithReset
// except that we gather from multiple sources. This is necessary
// because both CSI metrics manager and component-base manage
// their own registry. Probably could be avoided by making
// CSI metrics manager a bit more flexible.
mux.Handle(*metricsPath,
promhttp.InstrumentMetricHandler(
reg,
promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{})))
if *enableProfile {
klog.InfoS("Starting profiling", "endpoint", httpEndpoint)
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}
go func() {
klog.Infof("ServeMux listening at %q", addr)
err := http.ListenAndServe(addr, mux)
if err != nil {
klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", addr, *metricsPath, err)
}
}()
}
logger := klog.FromContext(ctx)
provisionController = controller.NewProvisionController(
logger,
clientset,
provisionerName,
csiProvisioner,
provisionerOptions...,
)
csiClaimController := ctrl.NewCloningProtectionController(
clientset,
claimLister,
claimInformer,
claimQueue,
controllerCapabilities,
)
run := func(ctx context.Context) {
factory.Start(ctx.Done())
if factoryForNamespace != nil {
// Starting is enough, the capacity controller will
// wait for sync.
factoryForNamespace.Start(ctx.Done())
}
cacheSyncResult := factory.WaitForCacheSync(ctx.Done())
for _, v := range cacheSyncResult {
if !v {
klog.Fatalf("Failed to sync Informers!")
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.CrossNamespaceVolumeDataSource) {
if gatewayFactory != nil {
gatewayFactory.Start(ctx.Done())
}
gatewayCacheSyncResult := gatewayFactory.WaitForCacheSync(ctx.Done())
for _, v := range gatewayCacheSyncResult {
if !v {
klog.Fatalf("Failed to sync Informers for gateway!")
}
}
}
if capacityController != nil {
go capacityController.Run(ctx, int(*capacityThreads))
}
if csiClaimController != nil {
go csiClaimController.Run(ctx, int(*finalizerThreads))
}
provisionController.Run(ctx)
}
if !*enableLeaderElection {
run(ctx)
} else {
// this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/controller
// to preserve backwards compatibility
lockName := strings.Replace(provisionerName, "/", "-", -1)
// create a new clientset for leader election
leClientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create leaderelection client: %v", err)
}
le := leaderelection.NewLeaderElection(leClientset, lockName, run)
if *httpEndpoint != "" {
le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout)
}
if *leaderElectionNamespace != "" {
le.WithNamespace(*leaderElectionNamespace)
}
le.WithLeaseDuration(*leaderElectionLeaseDuration)
le.WithRenewDeadline(*leaderElectionRenewDeadline)
le.WithRetryPeriod(*leaderElectionRetryPeriod)
le.WithIdentity(identity)
if err := le.Run(); err != nil {
klog.Fatalf("failed to initialize leader election: %v", err)
}
}
}
NewProvisionController()
- 获取主机名和生成唯一ID:
- 使用
os.Hostname()
获取当前主机名,如果获取失败,则记录错误日志并退出程序。 - 将主机名与一个UUID结合,生成一个唯一的ID,以避免在同一主机上运行的多个进程发生冲突。
- 使用
- 初始化事件记录器:
- 使用
record.NewBroadcaster()
创建一个事件广播器,并配置其进行结构化日志记录和事件记录。 - 创建一个
eventRecorder
,用于记录事件。
- 使用
- 创建并初始化
ProvisionController
实例:- 初始化
ProvisionController
结构体,包括客户端、供应器名称、供应器实现、ID、组件名、事件记录器等字段。 - 设置一系列默认值,如重同步周期、错误时的指数退避策略、线程数、失败阈值等。
- 初始化指标相关配置。
- 初始化
- 处理选项函数:
- 遍历传入的选项函数列表,对每个函数进行调用,以配置
ProvisionController
实例。如果某个选项函数执行失败,则记录错误日志并退出程序。
- 遍历传入的选项函数列表,对每个函数进行调用,以配置
- 初始化速率限制器和工作队列:
- 根据配置创建速率限制器,并用于初始化
claimQueue
和volumeQueue
两个工作队列。
- 根据配置创建速率限制器,并用于初始化
- 初始化Informer和事件处理器:
- 使用
informers.NewSharedInformerFactory
创建共享Informer工厂。 - 为PersistentVolumeClaims(PVCs)、PersistentVolumes(PVs)和StorageClasses分别设置事件处理器和Indexer。
- Informer用于监听Kubernetes资源的变化,并根据变化触发相应的事件处理函数。
- 使用
- 初始化VolumeStore:
- 根据配置选择使用
NewVolumeStoreQueue
或NewBackoffStore
来初始化volumeStore
,用于处理PV的创建和保存逻辑。
- 根据配置选择使用
// NewProvisionController creates a new provision controller using
// the given configuration parameters and with private (non-shared) informers.
func NewProvisionController(
logger klog.Logger,
client kubernetes.Interface,
provisionerName string,
provisioner Provisioner,
options ...func(*ProvisionController) error,
) *ProvisionController {
id, err := os.Hostname()
if err != nil {
logger.Error(err, "Error getting hostname")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID())
component := provisionerName + "_" + id
// TODO: Once the following PR is merged, change to use StartLogging and StartRecordingToSinkWithContext
// https://github.com/kubernetes/kubernetes/pull/120729
v1.AddToScheme(scheme.Scheme)
broadcaster := record.NewBroadcaster()
broadcaster.StartStructuredLogging(0)
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events(v1.NamespaceAll)})
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: component})
controller := &ProvisionController{
client: client,
provisionerName: provisionerName,
provisioner: provisioner,
id: id,
component: component,
eventRecorder: eventRecorder,
resyncPeriod: DefaultResyncPeriod,
exponentialBackOffOnError: DefaultExponentialBackOffOnError,
threadiness: DefaultThreadiness,
failedProvisionThreshold: DefaultFailedProvisionThreshold,
failedDeleteThreshold: DefaultFailedDeleteThreshold,
leaderElection: DefaultLeaderElection,
leaderElectionNamespace: getInClusterNamespace(),
leaseDuration: DefaultLeaseDuration,
renewDeadline: DefaultRenewDeadline,
retryPeriod: DefaultRetryPeriod,
metrics: metrics.New(controllerSubsystem),
metricsPort: DefaultMetricsPort,
metricsAddress: DefaultMetricsAddress,
metricsPath: DefaultMetricsPath,
addFinalizer: DefaultAddFinalizer,
hasRun: false,
hasRunLock: &sync.Mutex{},
}
for _, option := range options {
err := option(controller)
if err != nil {
logger.Error(err, "Error processing controller options")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
var rateLimiter workqueue.RateLimiter
if controller.rateLimiter != nil {
// rateLimiter set via parameter takes precedence
rateLimiter = controller.rateLimiter
} else if controller.exponentialBackOffOnError {
rateLimiter = workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
} else {
rateLimiter = workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 15*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes")
informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod)
// ----------------------
// PersistentVolumeClaims
claimHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueClaim(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueClaim(newObj) },
DeleteFunc: func(obj interface{}) {
// NOOP. The claim is either in claimsInProgress and in the queue, so it will be processed as usual
// or it's not in claimsInProgress and then we don't care
},
}
if controller.claimInformer != nil {
controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)
} else {
controller.claimInformer = informer.Core().V1().PersistentVolumeClaims().Informer()
controller.claimInformer.AddEventHandler(claimHandler)
}
err = controller.claimInformer.AddIndexers(cache.Indexers{uidIndex: func(obj interface{}) ([]string, error) {
uid, err := getObjectUID(obj)
if err != nil {
return nil, err
}
return []string{uid}, nil
}})
if err != nil {
logger.Error(err, "Error setting indexer for pvc informer", "indexer", uidIndex)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
controller.claimsIndexer = controller.claimInformer.GetIndexer()
// -----------------
// PersistentVolumes
volumeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueVolume(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueVolume(newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetVolume(obj) },
}
if controller.volumeInformer != nil {
controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)
} else {
controller.volumeInformer = informer.Core().V1().PersistentVolumes().Informer()
controller.volumeInformer.AddEventHandler(volumeHandler)
}
controller.volumes = controller.volumeInformer.GetStore()
// --------------
// StorageClasses
// no resource event handler needed for StorageClasses
if controller.classInformer == nil {
controller.classInformer = informer.Storage().V1().StorageClasses().Informer()
}
controller.classes = controller.classInformer.GetStore()
if controller.createProvisionerPVLimiter != nil {
logger.V(2).Info("Using saving PVs to API server in background")
controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter, controller.claimsIndexer, controller.eventRecorder)
} else {
if controller.createProvisionedPVBackoff == nil {
// Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default.
if controller.createProvisionedPVInterval == 0 {
controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval
}
if controller.createProvisionedPVRetryCount == 0 {
controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount
}
controller.createProvisionedPVBackoff = &wait.Backoff{
Duration: controller.createProvisionedPVInterval,
Factor: 1, // linear backoff
Steps: controller.createProvisionedPVRetryCount,
// Cap: controller.createProvisionedPVInterval,
}
}
logger.V(2).Info("Using blocking saving PVs to API server")
controller.volumeStore = NewBackoffStore(client, controller.eventRecorder, controller.createProvisionedPVBackoff, controller)
}
return controller
}
syncClaim()
- 判断是否应该进行供给:
- 调用
ctrl.shouldProvision(ctx, claim)
方法来判断是否需要对这个PVC进行供给操作。如果返回错误,则更新供给统计信息并返回错误。 - 如果
should
为true
,表示需要进行供给操作。
- 调用
- 供给操作:
- 记录供给操作的开始时间。
- 从上下文中获取logger对象。
- 调用
ctrl.provisionClaimOperation(ctx, claim)
方法进行供给操作,返回操作状态和可能的错误。 - 更新供给统计信息,传入错误和开始时间。
- 处理供给操作的结果:
- 如果供给操作没有错误或者状态是
ProvisioningFinished
,表示供给操作已经完成或者不需要进行。根据错误类型进行不同的处理:- 如果没有错误,记录日志并删除该PVC在
claimsInProgress
中的记录。 - 如果错误是
errStopProvision
,记录日志并将错误置为nil
(调用者会重新排队处理)。 - 其他错误类型,记录日志。
- 如果没有错误,记录日志并删除该PVC在
- 如果供给状态是
ProvisioningInBackground
,表示供给操作正在后台进行,记录日志并将PVC添加到claimsInProgress
中。 - 如果供给状态是
ProvisioningNoChange
,不做任何修改,保持claimsInProgress
的状态不变。
- 如果供给操作没有错误或者状态是
- 返回错误:
- 如果不需要进行供给操作或者供给操作已经完成并且没有需要处理的错误,则返回
nil
。 - 否则,返回供给操作中的错误。
这段代码的主要逻辑是围绕PVC的供给状态进行操作,根据供给的结果更新内部状态(如claimsInProgress
),并记录相关的日志信息。通过这种方式,ProvisionController
能够管理多个PVC的供给过程,确保每个PVC都能够被正确地处理。
- 如果不需要进行供给操作或者供给操作已经完成并且没有需要处理的错误,则返回
func (ctrl *ProvisionController) syncClaim(ctx context.Context, obj interface{}) error {
claim, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return fmt.Errorf("expected claim but got %+v", obj)
}
should, err := ctrl.shouldProvision(ctx, claim)
if err != nil {
ctrl.updateProvisionStats(claim, err, time.Time{})
return err
} else if should {
startTime := time.Now()
logger := klog.FromContext(ctx)
status, err := ctrl.provisionClaimOperation(ctx, claim)
ctrl.updateProvisionStats(claim, err, startTime)
if err == nil || status == ProvisioningFinished {
// Provisioning is 100% finished / not in progress.
switch err {
case nil:
logger.V(5).Info("Claim processing succeeded, removing PVC from claims in progress", "claimUID", claim.UID)
case errStopProvision:
logger.V(5).Info("Stop provisioning, removing PVC from claims in progress", "claimUID", claim.UID)
// Our caller would requeue if we pass on this special error; return nil instead.
err = nil
default:
logger.V(2).Info("Final error received, removing PVC from claims in progress", "claimUID", claim.UID)
}
ctrl.claimsInProgress.Delete(string(claim.UID))
return err
}
if status == ProvisioningInBackground {
// Provisioning is in progress in background.
logger.V(2).Info("Temporary error received, adding PVC to claims in progress", "claimUID", claim.UID)
ctrl.claimsInProgress.Store(string(claim.UID), claim)
} else {
// status == ProvisioningNoChange.
// Don't change claimsInProgress:
// - the claim is already there if previous status was ProvisioningInBackground.
// - the claim is not there if if previous status was ProvisioningFinished.
}
return err
}
return nil
}
shouldProvision()
- 检查PVC是否已指定卷名:
- 如果
claim.Spec.VolumeName
不为空,表示这个PVC已经绑定到了一个具体的卷上,因此不需要再进行供给。方法返回false, nil
。
- 如果
- 检查Provisioner是否实现了Qualifier接口:
- 通过类型断言
ctrl.provisioner.(Qualifier)
检查ctrl.provisioner
是否实现了Qualifier
接口。 - 如果实现了,并且
Qualifier
接口的ShouldProvision
方法返回false
,则表示不需要进行供给。方法返回false, nil
。
- 通过类型断言
- 检查PVC的注解以确定Provisioner:
- 首先尝试从PVC的注解中获取
annStorageProvisioner
的值。 - 如果不存在,则尝试获取
annBetaStorageProvisioner
的值。 - 这两个注解用于指定负责供给卷的Provisioner。
- 首先尝试从PVC的注解中获取
- 检查找到的Provisioner是否是已知的:
- 如果找到了Provisioner的注解,并且这个Provisioner是控制器已知的(通过
ctrl.knownProvisioner(provisioner)
检查),则继续下一步。
- 如果找到了Provisioner的注解,并且这个Provisioner是控制器已知的(通过
- 检查StorageClass的VolumeBindingMode:
- 通过
util.GetPersistentVolumeClaimClass(claim)
获取PVC所属的StorageClass。 - 通过
ctrl.getStorageClass(claimClass)
获取这个StorageClass的详细信息。 - 检查StorageClass的
检查StorageClass的VolumeBindingMode
。如果设置为storage.VolumeBindingWaitForFirstConsumer
(即延迟绑定模式),则需要进一步检查PVC的注解中是否有annSelectedNode
。- 如果有
annSelectedNode
且其值不为空,表示已经选定了节点,可以进行供给。方法返回true, nil
。 - 如果没有或值为空,则不进行供给。方法返回
false, nil
。
- 如果有
- 通过
- 默认进行供给:
- 如果StorageClass的
VolumeBindingMode
不是延迟绑定模式,或者没有找到VolumeBindingMode
,则默认需要进行供给。方法返回true, nil
。
- 如果StorageClass的
- 未找到Provisioner:
- 如果在PVC的注解中没有找到任何Provisioner的标识,则不进行供给。方法返回
false, nil
。
- 如果在PVC的注解中没有找到任何Provisioner的标识,则不进行供给。方法返回
总结来说,这段代码通过检查PVC的各种属性和注解,以及关联的StorageClass的配置,来决定是否需要对这个PVC进行卷的供给。这涉及到检查是否已经指定了卷、是否满足特定的供给条件、是否使用了延迟绑定模式等多个方面
// shouldProvision returns whether a claim should have a volume provisioned for
// it, i.e. whether a Provision is "desired"
func (ctrl *ProvisionController) shouldProvision(ctx context.Context, claim *v1.PersistentVolumeClaim) (bool, error) {
if claim.Spec.VolumeName != "" {
return false, nil
}
if qualifier, ok := ctrl.provisioner.(Qualifier); ok {
if !qualifier.ShouldProvision(ctx, claim) {
return false, nil
}
}
provisioner, found := claim.Annotations[annStorageProvisioner]
if !found {
provisioner, found = claim.Annotations[annBetaStorageProvisioner]
}
if found {
if ctrl.knownProvisioner(provisioner) {
claimClass := util.GetPersistentVolumeClaimClass(claim)
class, err := ctrl.getStorageClass(claimClass)
if err != nil {
return false, err
}
if class.VolumeBindingMode != nil && *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {
// When claim is in delay binding mode, annSelectedNode is
// required to provision volume.
// Though PV controller set annStorageProvisioner only when
// annSelectedNode is set, but provisioner may remove
// annSelectedNode to notify scheduler to reschedule again.
if selectedNode, ok := claim.Annotations[annSelectedNode]; ok && selectedNode != "" {
return true, nil
}
return false, nil
}
return true, nil
}
}
return false, nil
}
provisionClaimOperation()
- 获取PVC的类别:
- 使用
util.GetPersistentVolumeClaimClass(claim)
获取PVC的存储类别(StorageClass)。
- 使用
- 日志记录:
- 使用Kubernetes的日志库klog来记录日志,包括PVC和StorageClass的信息。
- 检查PV是否已经存在:
- 通过
ctrl.getProvisionedVolumeNameForClaim(claim)
获取预期的PV名称,然后检查这个PV是否已经在ctrl.volumes
中存在。如果存在,说明PV已经被分配,函数返回ProvisioningFinished
和errStopProvision
。
- 通过
- 获取PVC的引用:
- 使用
ref.GetReference(scheme.Scheme, claim)
获取PVC的引用,以便在后续操作中引用这个PVC对象。
- 使用
- 检查是否可以分配:
- 调用
ctrl.canProvision(ctx, claim)
检查当前的ProvisionController是否可以处理这个PVC的分配请求。如果不能,记录事件并返回错误。
- 调用
- 获取StorageClass信息:
- 通过
ctrl.getStorageClass(claimClass)
获取PVC指定的StorageClass的信息。如果获取失败或StorageClass的Provisioner不被当前ProvisionController支持,则记录错误并返回。
- 通过
- 获取选定的节点:
- 如果PVC的注解中指定了选定的节点(
annSelectedNode
或annAlphaSelectedNode
),则尝试获取这个节点的信息。如果节点不存在,调用ctrl.provisionVolumeErrorHandling
处理错误。
- 如果PVC的注解中指定了选定的节点(
- 准备分配选项:
- 创建一个
ProvisionOptions
对象,包含StorageClass、PV名称、PVC对象和选定的节点信息。
- 创建一个
- 记录正常事件:
- 使用
ctrl.eventRecorder.Event
记录一个正常事件,表示外部Provisioner正在为PVC分配存储卷。
- 使用
- 调用Provisioner进行分配:
- 调用
ctrl.provisioner.Provision(ctx, options)
尝试分配存储卷。如果分配失败,根据错误类型进行相应的错误处理。
- 调用
- 设置PVC的引用和Finalizer:
- 如果分配成功,设置PV的
ClaimRef
为PVC的引用,并根据需要添加Finalizer。
- 如果分配成功,设置PV的
- 更新PV的元数据和存储类别:
- 更新PV的注解和存储类别信息。
- 存储和添加PV:
- 使用
ctrl.volumeStore.StoreVolume
存储PV信息,并将PV添加到ctrl.volumes
中。
- 使用
- 返回结果:
- 如果所有操作都成功,函数返回
ProvisioningFinished
和nil
表示成功完成分配。
- 如果所有操作都成功,函数返回
这个函数涵盖了从检查PV是否存在到实际分配存储卷,再到更新内部状态和记录相关事件的整个过程。它是Kubernetes存储卷分配流程中的一个关键部分,确保了PVC能够被正确地处理和分配存储资源。
func (ctrl *ProvisionController) provisionClaimOperation(ctx context.Context, claim *v1.PersistentVolumeClaim) (ProvisioningState, error) {
// Most code here is identical to that found in controller.go of kube's PV controller...
claimClass := util.GetPersistentVolumeClaimClass(claim)
logger := klog.LoggerWithValues(klog.FromContext(ctx), "PVC", klog.KObj(claim), "StorageClass", claimClass)
logger.V(4).Info("Started")
// A previous doProvisionClaim may just have finished while we were waiting for
// the locks. Check that PV (with deterministic name) hasn't been provisioned
// yet.
pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
_, exists, err := ctrl.volumes.GetByKey(pvName)
if err == nil && exists {
// Volume has been already provisioned, nothing to do.
logger.V(4).Info("PersistentVolume already exists, skipping", "PV", pvName)
return ProvisioningFinished, errStopProvision
}
// Prepare a claimRef to the claim early (to fail before a volume is
// provisioned)
claimRef, err := ref.GetReference(scheme.Scheme, claim)
if err != nil {
logger.Error(err, "Unexpected error getting claim reference")
return ProvisioningNoChange, err
}
// Check if this provisioner can provision this claim.
if err = ctrl.canProvision(ctx, claim); err != nil {
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
logger.Error(err, "Failed to provision volume")
return ProvisioningFinished, errStopProvision
}
// For any issues getting fields from StorageClass (including reclaimPolicy & mountOptions),
// retry the claim because the storageClass can be fixed/(re)created independently of the claim
class, err := ctrl.getStorageClass(claimClass)
if err != nil {
logger.Error(err, "Error getting claim's StorageClass's fields")
return ProvisioningFinished, err
}
if !ctrl.knownProvisioner(class.Provisioner) {
// class.Provisioner has either changed since shouldProvision() or
// annDynamicallyProvisioned contains different provisioner than
// class.Provisioner.
logger.Error(nil, "Unknown provisioner requested in claim's StorageClass", "provisioner", class.Provisioner)
return ProvisioningFinished, errStopProvision
}
var selectedNode *v1.Node
// Get SelectedNode
if nodeName, ok := getString(claim.Annotations, annSelectedNode, annAlphaSelectedNode); ok {
if ctrl.nodeLister != nil {
selectedNode, err = ctrl.nodeLister.Get(nodeName)
} else {
selectedNode, err = ctrl.client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) // TODO (verult) cache Nodes
}
if err != nil {
// if node does not exist, reschedule and remove volume.kubernetes.io/selected-node annotation
if apierrs.IsNotFound(err) {
ctx2 := klog.NewContext(ctx, logger)
return ctrl.provisionVolumeErrorHandling(ctx2, ProvisioningReschedule, err, claim)
}
err = fmt.Errorf("failed to get target node: %v", err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
return ProvisioningNoChange, err
}
}
options := ProvisionOptions{
StorageClass: class,
PVName: pvName,
PVC: claim,
SelectedNode: selectedNode,
}
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", klog.KObj(claim)))
volume, result, err := ctrl.provisioner.Provision(ctx, options)
if err != nil {
if ierr, ok := err.(*IgnoredError); ok {
// Provision ignored, do nothing and hope another provisioner will provision it.
logger.V(4).Info("Volume provision ignored", "reason", ierr)
return ProvisioningFinished, errStopProvision
}
ctx2 := klog.NewContext(ctx, logger)
err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err)
return ctrl.provisionVolumeErrorHandling(ctx2, result, err, claim)
}
logger.V(4).Info("Volume is provisioned", "PV", volume.Name)
// Set ClaimRef and the PV controller will bind and set annBoundByController for us
volume.Spec.ClaimRef = claimRef
// Add external provisioner finalizer if it doesn't already have it
if ctrl.addFinalizer && !ctrl.checkFinalizer(volume, finalizerPV) {
volume.ObjectMeta.Finalizers = append(volume.ObjectMeta.Finalizers, finalizerPV)
}
metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, class.Provisioner)
volume.Spec.StorageClassName = claimClass
logger.V(4).Info("Succeeded")
if err := ctrl.volumeStore.StoreVolume(logger, claim, volume); err != nil {
return ProvisioningFinished, err
}
if err = ctrl.volumes.Add(volume); err != nil {
utilruntime.HandleError(err)
}
return ProvisioningFinished, nil
}