@@ -31,7 +31,10 @@ import (
3131 "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
3232 "github.com/container-storage-interface/spec/lib/go/csi"
3333 "github.com/golang/protobuf/ptypes/timestamp"
34+ "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/wrap"
3435 "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/common"
36+ "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc"
37+ "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/waitstatus"
3538 "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/features"
3639 "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
3740 "google.golang.org/grpc/codes"
@@ -43,11 +46,13 @@ import (
4346 "k8s.io/apimachinery/pkg/util/sets"
4447 "k8s.io/client-go/tools/record"
4548 "k8s.io/klog/v2"
49+ "k8s.io/utils/clock"
4650)
4751
4852// controller server try to create/delete volumes/snapshots
4953type controllerServer struct {
5054 recorder record.EventRecorder
55+ modify ModifyServer
5156 common.GenericControllerServer
5257}
5358
@@ -81,10 +86,24 @@ var veasp = struct {
8186
8287var delVolumeSnap sync.Map
8388
89+ func newTaskStatusWaiter () waitstatus.StatusWaiter [ecs.Task ] {
90+ client := desc.Task {Client : GlobalConfigVar .EcsClient }
91+ waiter := waitstatus .NewBatched (client , clock.RealClock {}, 3 * time .Second , 10 * time .Second )
92+ waiter .PollHook = func () desc.Client [ecs.Task ] {
93+ return desc.Task {Client : updateEcsClient (GlobalConfigVar .EcsClient )}
94+ }
95+ go waiter .Run (context .Background ())
96+ return waiter
97+ }
98+
8499// NewControllerServer is to create controller server
85100func NewControllerServer () csi.ControllerServer {
86101 c := & controllerServer {
87102 recorder : utils .NewEventRecorder (),
103+ modify : ModifyServer {
104+ ecsClient : GlobalConfigVar .EcsClient ,
105+ taskWaiter : newTaskStatusWaiter (),
106+ },
88107 }
89108 return c
90109}
@@ -97,6 +116,7 @@ func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *
97116 csi .ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT ,
98117 csi .ControllerServiceCapability_RPC_LIST_SNAPSHOTS ,
99118 csi .ControllerServiceCapability_RPC_EXPAND_VOLUME ,
119+ csi .ControllerServiceCapability_RPC_MODIFY_VOLUME ,
100120 ),
101121 }, nil
102122}
@@ -137,6 +157,14 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
137157 return nil , status .Errorf (codes .InvalidArgument , "Invalid parameters from input: %v, with error: %v" , req .Name , err )
138158 }
139159
160+ if len (req .MutableParameters ) > 0 {
161+ mutable , err := parseMutableParameters (req .MutableParameters )
162+ if err != nil {
163+ return nil , status .Errorf (codes .InvalidArgument , "Invalid mutable parameters: %v" , err )
164+ }
165+ importMutableParameters (diskVol , & mutable )
166+ }
167+
140168 // 兼容 serverless 拓扑感知场景;
141169 // req参数里面包含了云盘ID,则直接使用云盘ID进行返回;
142170 csiVolume , err := staticVolumeCreate (req , snapshotID )
@@ -971,3 +999,18 @@ func (cs *controllerServer) deleteUntagAutoSnapshot(snapshotID, diskID string) {
971999 klog .Errorf ("ControllerExpandVolume:: failed to untag volumeExpandAutoSnapshot: %s" , err .Error ())
9721000 }
9731001}
1002+
1003+ func (cs * controllerServer ) ControllerModifyVolume (ctx context.Context , req * csi.ControllerModifyVolumeRequest ) (* csi.ControllerModifyVolumeResponse , error ) {
1004+ params , err := parseMutableParameters (req .MutableParameters )
1005+ if err != nil {
1006+ return nil , status .Error (codes .InvalidArgument , err .Error ())
1007+ }
1008+ err = cs .modify .Modify (ctx , req .VolumeId , params )
1009+ if err != nil {
1010+ if errors .Is (err , wrap .ErrorCode ("InvalidDiskId.NotFound" )) {
1011+ return nil , status .Error (codes .NotFound , err .Error ())
1012+ }
1013+ return nil , err
1014+ }
1015+ return & csi.ControllerModifyVolumeResponse {}, nil
1016+ }
0 commit comments