Skip to content

Commit b9969f9

Browse files
committed
Merge branch 'feature/tcproute-udproute'
2 parents 36f83d7 + 728139f commit b9969f9

21 files changed

+848
-56
lines changed

internal/controller/manager.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,18 @@ func registerControllers(
532532
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
533533
},
534534
},
535+
{
536+
objectType: &gatewayv1alpha2.TCPRoute{},
537+
options: []controller.Option{
538+
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
539+
},
540+
},
541+
{
542+
objectType: &gatewayv1alpha2.UDPRoute{},
543+
options: []controller.Option{
544+
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
545+
},
546+
},
535547
}
536548
controllerRegCfgs = append(controllerRegCfgs, gwExpFeatures...)
537549
}
@@ -758,6 +770,8 @@ func prepareFirstEventBatchPreparerArgs(cfg config.Config) ([]client.Object, []c
758770
&gatewayv1alpha3.BackendTLSPolicyList{},
759771
&apiv1.ConfigMapList{},
760772
&gatewayv1alpha2.TLSRouteList{},
773+
&gatewayv1alpha2.TCPRouteList{},
774+
&gatewayv1alpha2.UDPRouteList{},
761775
)
762776
}
763777

internal/controller/nginx/config/stream/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ type Server struct {
1414
RewriteClientIP shared.RewriteClientIPSettings
1515
SSLPreread bool
1616
IsSocket bool
17+
Protocol string
18+
UDPConfig *UDPConfig
19+
}
20+
21+
type UDPConfig struct {
22+
ProxyTimeout string
1723
}
1824

1925
// Upstream holds all configuration for a stream upstream.

internal/controller/nginx/config/stream_servers.go

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,24 @@ func (g GeneratorImpl) executeStreamServers(conf dataplane.Configuration) []exec
3333
}
3434

3535
func createStreamServers(conf dataplane.Configuration) []stream.Server {
36-
if len(conf.TLSPassthroughServers) == 0 {
36+
totalServers := len(conf.TLSPassthroughServers) + len(conf.TCPServers) + len(conf.UDPServers)
37+
if totalServers == 0 {
3738
return nil
3839
}
3940

40-
streamServers := make([]stream.Server, 0, len(conf.TLSPassthroughServers)*2)
41+
streamServers := make([]stream.Server, 0, totalServers*2)
4142
portSet := make(map[int32]struct{})
4243
upstreams := make(map[string]dataplane.Upstream)
4344

4445
for _, u := range conf.StreamUpstreams {
4546
upstreams[u.Name] = u
4647
}
48+
for _, u := range conf.TCPUpstreams {
49+
upstreams[u.Name] = u
50+
}
51+
for _, u := range conf.UDPUpstreams {
52+
upstreams[u.Name] = u
53+
}
4754

4855
for _, server := range conf.TLSPassthroughServers {
4956
if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" {
@@ -77,6 +84,47 @@ func createStreamServers(conf dataplane.Configuration) []stream.Server {
7784
}
7885
streamServers = append(streamServers, streamServer)
7986
}
87+
88+
// Process TCP servers
89+
for i, server := range conf.TCPServers {
90+
if _, inPortSet := portSet[server.Port]; inPortSet {
91+
continue // Skip if port already in use
92+
}
93+
94+
if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" && len(u.Endpoints) > 0 {
95+
streamServer := stream.Server{
96+
Listen: fmt.Sprint(server.Port),
97+
StatusZone: fmt.Sprintf("tcp_%d", server.Port),
98+
ProxyPass: server.UpstreamName,
99+
}
100+
streamServers = append(streamServers, streamServer)
101+
portSet[server.Port] = struct{}{}
102+
} else {
103+
fmt.Printf("DEBUG: createStreamServers - TCP Server %d: Skipped - upstream not found or no endpoints\n", i)
104+
}
105+
}
106+
107+
// Process UDP servers
108+
for _, server := range conf.UDPServers {
109+
if _, inPortSet := portSet[server.Port]; inPortSet {
110+
continue // Skip if port already in use
111+
}
112+
113+
if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" && len(u.Endpoints) > 0 {
114+
streamServer := stream.Server{
115+
Listen: fmt.Sprintf("%d udp", server.Port),
116+
StatusZone: fmt.Sprintf("udp_%d", server.Port),
117+
ProxyPass: server.UpstreamName,
118+
Protocol: "udp",
119+
UDPConfig: &stream.UDPConfig{
120+
ProxyTimeout: "1s",
121+
},
122+
}
123+
streamServers = append(streamServers, streamServer)
124+
portSet[server.Port] = struct{}{}
125+
}
126+
}
127+
80128
return streamServers
81129
}
82130

internal/controller/nginx/config/stream_servers_template.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ server {
3535
{{- if $s.SSLPreread }}
3636
ssl_preread on;
3737
{{- end }}
38+
39+
{{- if and (eq $s.Protocol "udp") $s.UDPConfig }}
40+
proxy_timeout {{ $s.UDPConfig.ProxyTimeout }};
41+
{{- end }}
3842
}
3943
{{- end }}
4044

internal/controller/nginx/config/upstreams.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,13 @@ func executeUpstreams(upstreams []http.Upstream) []executeResult {
6969
}
7070

7171
func (g GeneratorImpl) executeStreamUpstreams(conf dataplane.Configuration) []executeResult {
72-
upstreams := g.createStreamUpstreams(conf.StreamUpstreams)
72+
// Combine all stream upstreams: TLS, TCP, and UDP
73+
allUpstreams := make([]dataplane.Upstream, 0, len(conf.StreamUpstreams)+len(conf.TCPUpstreams)+len(conf.UDPUpstreams))
74+
allUpstreams = append(allUpstreams, conf.StreamUpstreams...)
75+
allUpstreams = append(allUpstreams, conf.TCPUpstreams...)
76+
allUpstreams = append(allUpstreams, conf.UDPUpstreams...)
77+
78+
upstreams := g.createStreamUpstreams(allUpstreams)
7379

7480
result := executeResult{
7581
dest: streamConfigFile,

internal/controller/provisioner/objects.go

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ const (
4444
defaultInitialDelaySeconds = int32(3)
4545
)
4646

47+
type PortInfo struct {
48+
Port int32
49+
Protocol corev1.Protocol
50+
}
51+
4752
var emptyDirVolumeSource = corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}
4853

4954
//nolint:gocyclo // will refactor at some point
@@ -147,9 +152,18 @@ func (p *NginxProvisioner) buildNginxResourceObjects(
147152
openshiftObjs = p.buildOpenshiftObjects(objectMeta)
148153
}
149154

150-
ports := make(map[int32]struct{})
155+
ports := make(map[int32]PortInfo)
151156
for _, listener := range gateway.Spec.Listeners {
152-
ports[int32(listener.Port)] = struct{}{}
157+
var protocol corev1.Protocol
158+
switch listener.Protocol {
159+
case gatewayv1.TCPProtocolType:
160+
protocol = corev1.ProtocolTCP
161+
case gatewayv1.UDPProtocolType:
162+
protocol = corev1.ProtocolUDP
163+
default:
164+
protocol = corev1.ProtocolTCP
165+
}
166+
ports[int32(listener.Port)] = PortInfo{Port: int32(listener.Port), Protocol: protocol}
153167
}
154168

155169
// Create separate copies of objectMeta for service and deployment to avoid shared map references
@@ -515,7 +529,7 @@ func (p *NginxProvisioner) buildOpenshiftObjects(objectMeta metav1.ObjectMeta) [
515529
func buildNginxService(
516530
objectMeta metav1.ObjectMeta,
517531
nProxyCfg *graph.EffectiveNginxProxy,
518-
ports map[int32]struct{},
532+
ports map[int32]PortInfo,
519533
selectorLabels map[string]string,
520534
addresses []gatewayv1.GatewaySpecAddress,
521535
) (*corev1.Service, error) {
@@ -538,16 +552,17 @@ func buildNginxService(
538552
}
539553

540554
servicePorts := make([]corev1.ServicePort, 0, len(ports))
541-
for port := range ports {
555+
for _, portInfo := range ports {
542556
servicePort := corev1.ServicePort{
543-
Name: fmt.Sprintf("port-%d", port),
544-
Port: port,
545-
TargetPort: intstr.FromInt32(port),
557+
Name: fmt.Sprintf("port-%d", portInfo.Port),
558+
Port: portInfo.Port,
559+
TargetPort: intstr.FromInt32(portInfo.Port),
560+
Protocol: portInfo.Protocol,
546561
}
547562

548563
if serviceType != corev1.ServiceTypeClusterIP {
549564
for _, nodePort := range serviceCfg.NodePorts {
550-
if nodePort.ListenerPort == port {
565+
if nodePort.ListenerPort == portInfo.Port {
551566
servicePort.NodePort = nodePort.Port
552567
}
553568
}
@@ -625,7 +640,7 @@ func (p *NginxProvisioner) buildNginxDeployment(
625640
nProxyCfg *graph.EffectiveNginxProxy,
626641
ngxIncludesConfigMapName string,
627642
ngxAgentConfigMapName string,
628-
ports map[int32]struct{},
643+
ports map[int32]PortInfo,
629644
selectorLabels map[string]string,
630645
agentTLSSecretName string,
631646
dockerSecretNames map[string]string,
@@ -779,7 +794,7 @@ func (p *NginxProvisioner) buildNginxPodTemplateSpec(
779794
nProxyCfg *graph.EffectiveNginxProxy,
780795
ngxIncludesConfigMapName string,
781796
ngxAgentConfigMapName string,
782-
ports map[int32]struct{},
797+
ports map[int32]PortInfo,
783798
agentTLSSecretName string,
784799
dockerSecretNames map[string]string,
785800
jwtSecretName string,
@@ -788,10 +803,11 @@ func (p *NginxProvisioner) buildNginxPodTemplateSpec(
788803
dataplaneKeySecretName string,
789804
) corev1.PodTemplateSpec {
790805
containerPorts := make([]corev1.ContainerPort, 0, len(ports))
791-
for port := range ports {
806+
for _, portInfo := range ports {
792807
containerPort := corev1.ContainerPort{
793-
Name: fmt.Sprintf("port-%d", port),
794-
ContainerPort: port,
808+
Name: fmt.Sprintf("port-%d", portInfo.Port),
809+
ContainerPort: portInfo.Port,
810+
Protocol: portInfo.Protocol,
795811
}
796812
containerPorts = append(containerPorts, containerPort)
797813
}

internal/controller/state/change_processor.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl {
9696
NginxProxies: make(map[types.NamespacedName]*ngfAPIv1alpha2.NginxProxy),
9797
GRPCRoutes: make(map[types.NamespacedName]*v1.GRPCRoute),
9898
TLSRoutes: make(map[types.NamespacedName]*v1alpha2.TLSRoute),
99+
TCPRoutes: make(map[types.NamespacedName]*v1alpha2.TCPRoute),
100+
UDPRoutes: make(map[types.NamespacedName]*v1alpha2.UDPRoute),
99101
NGFPolicies: make(map[graph.PolicyKey]policies.Policy),
100102
SnippetsFilters: make(map[types.NamespacedName]*ngfAPIv1alpha1.SnippetsFilter),
101103
}
@@ -211,6 +213,16 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl {
211213
store: newObjectStoreMapAdapter(clusterStore.TLSRoutes),
212214
predicate: nil,
213215
},
216+
{
217+
gvk: cfg.MustExtractGVK(&v1alpha2.TCPRoute{}),
218+
store: newObjectStoreMapAdapter(clusterStore.TCPRoutes),
219+
predicate: nil,
220+
},
221+
{
222+
gvk: cfg.MustExtractGVK(&v1alpha2.UDPRoute{}),
223+
store: newObjectStoreMapAdapter(clusterStore.UDPRoutes),
224+
predicate: nil,
225+
},
214226
{
215227
gvk: cfg.MustExtractGVK(&ngfAPIv1alpha1.SnippetsFilter{}),
216228
store: newObjectStoreMapAdapter(clusterStore.SnippetsFilters),

internal/controller/state/change_processor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3776,7 +3776,7 @@ var _ = Describe("ChangeProcessor", func() {
37763776
},
37773777
Entry(
37783778
"an unsupported resource",
3779-
&v1alpha2.TCPRoute{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "tcp"}},
3779+
&apiv1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"}},
37803780
),
37813781
Entry(
37823782
"nil resource",
@@ -3794,8 +3794,8 @@ var _ = Describe("ChangeProcessor", func() {
37943794
},
37953795
Entry(
37963796
"an unsupported resource",
3797-
&v1alpha2.TCPRoute{},
3798-
types.NamespacedName{Namespace: "test", Name: "tcp"},
3797+
&apiv1.Pod{},
3798+
types.NamespacedName{Namespace: "test", Name: "pod"},
37993799
),
38003800
Entry(
38013801
"nil resource type",

0 commit comments

Comments
 (0)