Skip to content

Commit e51184e

Browse files
authored
Merge pull request #347 from orozery/portmanager
controlplane/control: Support port updates
2 parents 4609ee0 + 16d7d90 commit e51184e

File tree

8 files changed

+76
-30
lines changed

8 files changed

+76
-30
lines changed

cmd/cl-controlplane/app/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (o *Options) Run() error {
175175
return fmt.Errorf("cannot create authz controllers: %w", err)
176176
}
177177

178-
controlManager := control.NewManager(mgr.GetClient())
178+
controlManager := control.NewManager(mgr.GetClient(), o.CRDMode)
179179

180180
xdsManager := xds.NewManager()
181181
xds.RegisterService(

config/crds/clusterlink.net_imports.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ spec:
6060
type: object
6161
type: array
6262
targetPort:
63-
description: TargetPort of the imported service.
63+
description: TargetPort of the imported service. This is the internal
64+
(non user-facing) listening port used by the dataplane pods.
6465
type: integer
6566
required:
6667
- port

config/operator/rbac/role.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ rules:
5757
- patch
5858
- update
5959
- watch
60+
- apiGroups:
61+
- clusterlink.net
62+
resources:
63+
- imports
64+
verbs:
65+
- update
6066
- apiGroups:
6167
- clusterlink.net
6268
resources:

pkg/apis/clusterlink.net/v1alpha1/import.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type ImportSpec struct {
4444
// Port of the imported service.
4545
Port uint16 `json:"port"`
4646
// TargetPort of the imported service.
47+
// This is the internal (non user-facing) listening port used by the dataplane pods.
4748
TargetPort uint16 `json:"targetPort,omitempty"`
4849
// Sources to import from.
4950
Sources []ImportSource `json:"sources"`

pkg/bootstrap/platform/k8s.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,11 @@ rules:
276276
- apiGroups: [""]
277277
resources: ["pods"]
278278
verbs: ["get", "list", "watch"]
279+
{{ if .crdMode }}
280+
- apiGroups: ["clusterlink.net"]
281+
resources: ["imports"]
282+
verbs: ["update"]
283+
{{ end }}
279284
---
280285
apiVersion: rbac.authorization.k8s.io/v1
281286
kind: ClusterRoleBinding

pkg/controlplane/control/manager.go

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ import (
3636
// This includes target port generation for imported services, as well as
3737
// k8s service creation per imported service.
3838
type Manager struct {
39-
client client.Client
40-
ports *portManager
39+
client client.Client
40+
crdMode bool
41+
ports *portManager
4142

4243
logger *logrus.Entry
4344
}
@@ -103,14 +104,6 @@ func (m *Manager) DeleteLegacyExport(namespace string, exportSpec *api.ExportSpe
103104
func (m *Manager) AddImport(ctx context.Context, imp *v1alpha1.Import) error {
104105
m.logger.Infof("Adding import '%s/%s'.", imp.Namespace, imp.Name)
105106

106-
// TODO: port manager should map ports to imports, and be able to detect conflicts
107-
port, err := m.ports.Lease(imp.Spec.TargetPort)
108-
if err != nil {
109-
return fmt.Errorf("cannot generate listening port: %w", err)
110-
}
111-
112-
imp.Spec.TargetPort = port
113-
114107
newService := &v1.Service{
115108
ObjectMeta: metav1.ObjectMeta{
116109
Name: imp.Name,
@@ -130,26 +123,59 @@ func (m *Manager) AddImport(ctx context.Context, imp *v1alpha1.Import) error {
130123
}
131124

132125
var oldService v1.Service
133-
err = m.client.Get(
126+
var create bool
127+
err := m.client.Get(
134128
ctx,
135129
types.NamespacedName{
136130
Name: imp.Name,
137131
Namespace: imp.Namespace,
138132
},
139133
&oldService)
140134
if err != nil {
141-
if errors.IsNotFound(err) {
142-
return m.client.Create(ctx, newService)
135+
if !errors.IsNotFound(err) {
136+
return err
143137
}
144138

145-
return err
139+
create = true
146140
}
147141

148-
if serviceChanged(&oldService, newService) {
149-
return m.client.Update(ctx, newService)
142+
// if service exists, and import specifies a random (0) target port,
143+
// then use existing service target port instead of allocating a new port
144+
if !create && len(oldService.Spec.Ports) == 1 && imp.Spec.TargetPort == 0 {
145+
imp.Spec.TargetPort = uint16(oldService.Spec.Ports[0].TargetPort.IntVal)
150146
}
151147

152-
return nil
148+
newPort := imp.Spec.TargetPort == 0
149+
150+
fullName := imp.Namespace + "/" + imp.Name
151+
port, err := m.ports.Lease(fullName, imp.Spec.TargetPort)
152+
if err != nil {
153+
return fmt.Errorf("cannot generate listening port: %w", err)
154+
}
155+
156+
if newPort {
157+
imp.Spec.TargetPort = port
158+
newService.Spec.Ports[0].TargetPort = intstr.FromInt32(int32(port))
159+
160+
if m.crdMode {
161+
if err := m.client.Update(ctx, imp); err != nil {
162+
m.ports.Release(port)
163+
return err
164+
}
165+
}
166+
}
167+
168+
if create {
169+
err = m.client.Create(ctx, newService)
170+
} else if serviceChanged(&oldService, newService) {
171+
err = m.client.Update(ctx, newService)
172+
}
173+
174+
if err != nil && newPort {
175+
m.ports.Release(port)
176+
}
177+
178+
return err
153179
}
154180

155181
// DeleteImport removes the listening socket of a previously imported service.
@@ -193,12 +219,13 @@ func serviceChanged(svc1, svc2 *v1.Service) bool {
193219
}
194220

195221
// NewManager returns a new control manager.
196-
func NewManager(cl client.Client) *Manager {
222+
func NewManager(cl client.Client, crdMode bool) *Manager {
197223
logger := logrus.WithField("component", "controlplane.control.manager")
198224

199225
return &Manager{
200-
client: cl,
201-
ports: newPortManager(),
202-
logger: logger,
226+
client: cl,
227+
crdMode: crdMode,
228+
ports: newPortManager(),
229+
logger: logger,
203230
}
204231
}

pkg/controlplane/control/portmanager.go renamed to pkg/controlplane/control/port.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ const (
3838
// portManager leases ports for use by imported services.
3939
type portManager struct {
4040
lock sync.Mutex
41-
leasedPorts map[uint16]struct{}
41+
leasedPorts map[uint16]string
4242

4343
logger *logrus.Entry
4444
}
@@ -74,8 +74,8 @@ func (m *portManager) getRandomFreePort() uint16 {
7474
return port
7575
}
7676

77-
// Lease marks a port as taken. If port is 0, some random free port is returned.
78-
func (m *portManager) Lease(port uint16) (uint16, error) {
77+
// Lease marks a port as taken by the given name. If port is 0, some random free port is returned.
78+
func (m *portManager) Lease(name string, port uint16) (uint16, error) {
7979
m.logger.Infof("Leasing: %d.", port)
8080

8181
m.lock.Lock()
@@ -89,13 +89,13 @@ func (m *portManager) Lease(port uint16) (uint16, error) {
8989
port = m.getRandomFreePort()
9090
m.logger.Infof("Generated port: %d.", port)
9191
} else {
92-
if _, ok := m.leasedPorts[port]; ok {
93-
return 0, fmt.Errorf("port %d is already leased", port)
92+
if leaseName, ok := m.leasedPorts[port]; ok && leaseName != name {
93+
return 0, fmt.Errorf("port %d is already leased to '%s'", port, leaseName)
9494
}
9595
}
9696

9797
// mark port is leased
98-
m.leasedPorts[port] = struct{}{}
98+
m.leasedPorts[port] = name
9999

100100
return port, nil
101101
}
@@ -120,7 +120,7 @@ func newPortManager() *portManager {
120120
).Info("Initialized.")
121121

122122
return &portManager{
123-
leasedPorts: make(map[uint16]struct{}),
123+
leasedPorts: make(map[uint16]string),
124124
logger: logger,
125125
}
126126
}

pkg/operator/controller/instance_controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ type InstanceReconciler struct {
7171
// +kubebuilder:rbac:groups="",resources=services;serviceaccounts,verbs=list;get;watch;create;update;patch;delete
7272
// +kubebuilder:rbac:groups="",resources=nodes,verbs=list;get;watch
7373
// +kubebuilder:rbac:groups="",resources=pods,verbs=list;get;watch
74+
// +kubebuilder:rbac:groups=clusterlink.net,resources=imports,verbs=update
7475
// +kubebuilder:rbac:groups="apps",resources=deployments,verbs=list;get;watch;create;update;patch;delete
7576
//nolint:lll // Ignore long line warning for Kubebuilder command.
7677
// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=clusterroles;clusterrolebindings,verbs=list;get;watch;create;update;patch;delete
@@ -456,6 +457,11 @@ func (r *InstanceReconciler) createAccessControl(ctx context.Context, name, name
456457
Resources: []string{"pods"},
457458
Verbs: []string{"get", "list", "watch"},
458459
},
460+
{
461+
APIGroups: []string{"clusterlink.net"},
462+
Resources: []string{"imports"},
463+
Verbs: []string{"update"},
464+
},
459465
},
460466
}
461467

0 commit comments

Comments
 (0)