From 5c0d768588da67fa56584149cbb2f551e11940b6 Mon Sep 17 00:00:00 2001 From: AlessandroCarbonelli Date: Tue, 11 Feb 2025 16:51:00 +0100 Subject: [PATCH] Added Broker client-side implementation --- .github/workflows/lint.yaml | 4 +- README.md | 1 + apis/network/v1alpha1/broker_status.go | 23 ++ apis/network/v1alpha1/broker_types.go | 67 ++++ .../network/v1alpha1/zz_generated.deepcopy.go | 100 +++++ cmd/network-manager/main.go | 4 +- deployments/node/README.md | 1 + .../node/crds/network.fluidos.eu_brokers.yaml | 225 +++++++++++ .../node-network-manager-ClusterRole.yaml | 31 ++ deployments/node/samples/broker.yaml | 26 ++ .../fluidos-pre-install-hook-network.yaml | 2 +- deployments/node/values.yaml | 1 + docs/implementation/components.md | 12 + docs/implementation/controllers.md | 10 + docs/implementation/customresources.md | 47 +++ docs/installation/installation.md | 49 ++- go.mod | 1 + go.sum | 6 +- pkg/network-manager/broker_client.go | 370 ++++++++++++++++++ pkg/network-manager/network-controller.go | 151 +++++-- quickstart/utils/consumer-values-no-ad.yaml | 3 +- quickstart/utils/consumer-values.yaml | 3 +- quickstart/utils/provider-values-no-ad.yaml | 3 +- quickstart/utils/provider-values.yaml | 3 +- tools/scripts/broker-creation.sh | 75 ++++ 25 files changed, 1170 insertions(+), 48 deletions(-) create mode 100644 apis/network/v1alpha1/broker_status.go create mode 100644 apis/network/v1alpha1/broker_types.go create mode 100644 deployments/node/crds/network.fluidos.eu_brokers.yaml create mode 100644 deployments/node/samples/broker.yaml create mode 100644 pkg/network-manager/broker_client.go create mode 100755 tools/scripts/broker-creation.sh diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 15c63ff0..6a5faab8 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -17,14 +17,14 @@ jobs: - name: Setup Go uses: actions/setup-go@v4 with: - go-version: "1.21" + go-version: "1.23" cache: false - name: golangci-lint uses: golangci/golangci-lint-action@v3.7.0 with: only-new-issues: true - version: v1.54.2 + version: v1.60 args: --timeout=900s gomodtidy: diff --git a/README.md b/README.md index 845c0a60..d0d60aee 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ This repository contains the FLUIDOS Node, along with its essential components, - [**Peering Candidates**](/docs/implementation/components.md#peering-candidates) - [**REAR Manager**](/docs/implementation/components.md#rear-manager) - [**Contract Manager**](/docs/implementation/components.md#contract-manager) +- [**Network Manager**](/docs/implementation/components.md#network-manager) Please note that this repository is continually updated, with additional components slated for future inclusion. diff --git a/apis/network/v1alpha1/broker_status.go b/apis/network/v1alpha1/broker_status.go new file mode 100644 index 00000000..0b8dba12 --- /dev/null +++ b/apis/network/v1alpha1/broker_status.go @@ -0,0 +1,23 @@ +// Copyright 2022-2025 FLUIDOS Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1alpha1 + +import "github.com/fluidos-project/node/pkg/utils/tools" + +// UpdateStatus updates the status of the broker. +func (broker *Broker) UpdateStatus() { + broker.Status.LastUpdateTime = tools.GetTimeNow() + broker.Status.ExpirationTime = tools.GetExpirationTime(0, 0, 10) +} diff --git a/apis/network/v1alpha1/broker_types.go b/apis/network/v1alpha1/broker_types.go new file mode 100644 index 00000000..a9868709 --- /dev/null +++ b/apis/network/v1alpha1/broker_types.go @@ -0,0 +1,67 @@ +// Copyright 2022-2025 FLUIDOS Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1alpha1 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// BrokerSpec defines the desired state of Broker. +type BrokerSpec struct { + + // Address of the Broker. + Address string `json:"address"` + Name string `json:"name"` + ClCert *corev1.Secret `json:"clcert"` + CaCert *corev1.Secret `json:"cacert"` + Role string `json:"role"` +} + +// BrokerStatus defines the observed state of Broker. +type BrokerStatus struct { + + // This field represents the expiration time of the Broker. It is used to determine when the Broker is no longer valid. + ExpirationTime string `json:"expirationTime"` + + // This field represents the last update time of the Broker. + LastUpdateTime string `json:"lastUpdateTime"` +} + +//+kubebuilder:object:root=true +//+kubebuilder:subresource:status +//+kubebuilder:resource:shortName=broker;brokers + +// Broker is the Schema for the clusters API. +type Broker struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec BrokerSpec `json:"spec,omitempty"` + Status BrokerStatus `json:"status,omitempty"` +} + +//+kubebuilder:object:root=true + +// BrokerList contains a list of Broker. +type BrokerList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Broker `json:"items"` +} + +func init() { + SchemeBuilder.Register(&Broker{}, &BrokerList{}) +} diff --git a/apis/network/v1alpha1/zz_generated.deepcopy.go b/apis/network/v1alpha1/zz_generated.deepcopy.go index 7b74368f..273f12f9 100644 --- a/apis/network/v1alpha1/zz_generated.deepcopy.go +++ b/apis/network/v1alpha1/zz_generated.deepcopy.go @@ -19,9 +19,109 @@ package v1alpha1 import ( + "k8s.io/api/core/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Broker) DeepCopyInto(out *Broker) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Broker. +func (in *Broker) DeepCopy() *Broker { + if in == nil { + return nil + } + out := new(Broker) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Broker) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BrokerList) DeepCopyInto(out *BrokerList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Broker, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BrokerList. +func (in *BrokerList) DeepCopy() *BrokerList { + if in == nil { + return nil + } + out := new(BrokerList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *BrokerList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BrokerSpec) DeepCopyInto(out *BrokerSpec) { + *out = *in + if in.ClCert != nil { + in, out := &in.ClCert, &out.ClCert + *out = new(v1.Secret) + (*in).DeepCopyInto(*out) + } + if in.CaCert != nil { + in, out := &in.CaCert, &out.CaCert + *out = new(v1.Secret) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BrokerSpec. +func (in *BrokerSpec) DeepCopy() *BrokerSpec { + if in == nil { + return nil + } + out := new(BrokerSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BrokerStatus) DeepCopyInto(out *BrokerStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BrokerStatus. +func (in *BrokerStatus) DeepCopy() *BrokerStatus { + if in == nil { + return nil + } + out := new(BrokerStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KnownCluster) DeepCopyInto(out *KnownCluster) { *out = *in diff --git a/cmd/network-manager/main.go b/cmd/network-manager/main.go index 62bdbe0e..c8f38956 100644 --- a/cmd/network-manager/main.go +++ b/cmd/network-manager/main.go @@ -102,11 +102,11 @@ func main() { } // Register the controller - if err = (&networkmanager.KnownClusterReconciler{ + if err = (&networkmanager.BrokerReconciler{ Client: cl, Scheme: mgr.GetScheme(), }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "KnownCluster") + setupLog.Error(err, "unable to create controller", "controller", "Broker") os.Exit(1) } diff --git a/deployments/node/README.md b/deployments/node/README.md index 52e9b6aa..7712c3c0 100644 --- a/deployments/node/README.md +++ b/deployments/node/README.md @@ -34,6 +34,7 @@ A Helm chart for Fluidos Node | networkManager.config.address.thirdOctet | string | `nil` | The third octet of the CNI virtual network subnet | | networkManager.config.multicast.address | string | `"239.11.11.1"` | | | networkManager.config.multicast.port | int | `4000` | | +| networkManager.config.netInterface | string | `"eth0"` | | | networkManager.imageName | string | `"ghcr.io/fluidos-project/network-manager"` | | | networkManager.pod.annotations | object | `{}` | Annotations for the network-manager pod. | | networkManager.pod.extraArgs | list | `[]` | Extra arguments for the network-manager pod. | diff --git a/deployments/node/crds/network.fluidos.eu_brokers.yaml b/deployments/node/crds/network.fluidos.eu_brokers.yaml new file mode 100644 index 00000000..675efcf5 --- /dev/null +++ b/deployments/node/crds/network.fluidos.eu_brokers.yaml @@ -0,0 +1,225 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + name: brokers.network.fluidos.eu +spec: + group: network.fluidos.eu + names: + kind: Broker + listKind: BrokerList + plural: brokers + shortNames: + - broker + - brokers + singular: broker + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: Broker is the Schema for the clusters API. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: BrokerSpec defines the desired state of Broker. + properties: + address: + description: Address of the Broker. + type: string + cacert: + description: |- + Secret holds secret data of a certain type. The total bytes of the values in + the Data field must be less than MaxSecretSize bytes. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + data: + additionalProperties: + format: byte + type: string + description: |- + Data contains the secret data. Each key must consist of alphanumeric + characters, '-', '_' or '.'. The serialized form of the secret data is a + base64 encoded string, representing the arbitrary (possibly non-string) + data value here. Described in https://tools.ietf.org/html/rfc4648#section-4 + type: object + immutable: + description: |- + Immutable, if set to true, ensures that data stored in the Secret cannot + be updated (only object metadata can be modified). + If not set to true, the field can be modified at any time. + Defaulted to nil. + type: boolean + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + description: |- + Standard object's metadata. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata + properties: + annotations: + additionalProperties: + type: string + type: object + finalizers: + items: + type: string + type: array + labels: + additionalProperties: + type: string + type: object + name: + type: string + namespace: + type: string + type: object + stringData: + additionalProperties: + type: string + description: |- + stringData allows specifying non-binary secret data in string form. + It is provided as a write-only input field for convenience. + All keys and values are merged into the data field on write, overwriting any existing values. + The stringData field is never output when reading from the API. + type: object + type: + description: |- + Used to facilitate programmatic handling of secret data. + More info: https://kubernetes.io/docs/concepts/configuration/secret/#secret-types + type: string + type: object + clcert: + description: |- + Secret holds secret data of a certain type. The total bytes of the values in + the Data field must be less than MaxSecretSize bytes. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + data: + additionalProperties: + format: byte + type: string + description: |- + Data contains the secret data. Each key must consist of alphanumeric + characters, '-', '_' or '.'. The serialized form of the secret data is a + base64 encoded string, representing the arbitrary (possibly non-string) + data value here. Described in https://tools.ietf.org/html/rfc4648#section-4 + type: object + immutable: + description: |- + Immutable, if set to true, ensures that data stored in the Secret cannot + be updated (only object metadata can be modified). + If not set to true, the field can be modified at any time. + Defaulted to nil. + type: boolean + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + description: |- + Standard object's metadata. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata + properties: + annotations: + additionalProperties: + type: string + type: object + finalizers: + items: + type: string + type: array + labels: + additionalProperties: + type: string + type: object + name: + type: string + namespace: + type: string + type: object + stringData: + additionalProperties: + type: string + description: |- + stringData allows specifying non-binary secret data in string form. + It is provided as a write-only input field for convenience. + All keys and values are merged into the data field on write, overwriting any existing values. + The stringData field is never output when reading from the API. + type: object + type: + description: |- + Used to facilitate programmatic handling of secret data. + More info: https://kubernetes.io/docs/concepts/configuration/secret/#secret-types + type: string + type: object + name: + type: string + role: + type: string + required: + - address + - cacert + - clcert + - name + - role + type: object + status: + description: BrokerStatus defines the observed state of Broker. + properties: + expirationTime: + description: This field represents the expiration time of the Broker. + It is used to determine when the Broker is no longer valid. + type: string + lastUpdateTime: + description: This field represents the last update time of the Broker. + type: string + required: + - expirationTime + - lastUpdateTime + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/deployments/node/files/node-network-manager-ClusterRole.yaml b/deployments/node/files/node-network-manager-ClusterRole.yaml index 327aa536..5e53dcf7 100644 --- a/deployments/node/files/node-network-manager-ClusterRole.yaml +++ b/deployments/node/files/node-network-manager-ClusterRole.yaml @@ -1,4 +1,15 @@ rules: +- apiGroups: + - "" + resources: + - secrets + verbs: + - create + - delete + - get + - list + - update + - watch - apiGroups: - "" resources: @@ -15,6 +26,26 @@ rules: - get - list - watch +- apiGroups: + - network.fluidos.eu + resources: + - brokers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - network.fluidos.eu + resources: + - brokers/status + verbs: + - get + - patch + - update - apiGroups: - network.fluidos.eu resources: diff --git a/deployments/node/samples/broker.yaml b/deployments/node/samples/broker.yaml new file mode 100644 index 00000000..97356290 --- /dev/null +++ b/deployments/node/samples/broker.yaml @@ -0,0 +1,26 @@ +--- +apiVersion: network.fluidos.eu/v1alpha1 +kind: Broker +metadata: + name: broker-sample + namespace: fluidos +spec: + name: fluidos.top-ix.org + address: fluidos.top-ix.org + # "publisher" only publisher + # "subscriber" only subscriber + # anything else both publisher AND subscriber + role: both + #secrets must be created from certificate and key provided by broker server + cacert: + apiVersion: v1 + kind: Secret + metadata: + name: broker-ca-secret + namespace: fluidos + clcert: + apiVersion: v1 + kind: Secret + metadata: + name: broker-client-secret + namespace: fluidos diff --git a/deployments/node/templates/fluidos-pre-install-hook-network.yaml b/deployments/node/templates/fluidos-pre-install-hook-network.yaml index 1017d874..5d0f0fc9 100644 --- a/deployments/node/templates/fluidos-pre-install-hook-network.yaml +++ b/deployments/node/templates/fluidos-pre-install-hook-network.yaml @@ -15,7 +15,7 @@ spec: config: '{ "cniVersion": "0.3.0", "type": "macvlan", - "master": "eth0", + "master": "{{ .Values.networkManager.config.netInterface }}", "mode": "bridge", "ipam": { "type": "host-local", diff --git a/deployments/node/values.yaml b/deployments/node/values.yaml index ab32e530..6fac4054 100644 --- a/deployments/node/values.yaml +++ b/deployments/node/values.yaml @@ -151,6 +151,7 @@ networkManager: multicast: address: "239.11.11.1" port: 4000 + netInterface: "eth0" provider: "your-provider" diff --git a/docs/implementation/components.md b/docs/implementation/components.md index 1428f632..31a4eb03 100644 --- a/docs/implementation/components.md +++ b/docs/implementation/components.md @@ -8,6 +8,7 @@ Let's see how each component has been implemented: - [**Peering Candidates**](#peering-candidates) - [**REAR Manager**](#rear-manager) - [**Contract Manager**](#contract-manager) +- [**Network Manager**](#network-manager) ## Local ResourceManager @@ -58,3 +59,14 @@ The Contract Manager is in charge of managing the reserve and purchase of resour - When a suitable peering candidate is identified and a Reservation is forged, the Contract Manager initiates the `Reserve` phase by sending a **RESERVE\_FLAVOUR** message. - Upon successful reservation of resources, it proceeds to the `Purchase` phase by sending a **PURCHASE\_FLAVOUR** message. Following this, it stores the contract received. + +## Network Manager + +The **Network Manager** is the component that allows the discovery of other FLUIDOS Nodes, both in the same LAN and in the WAN. +To do so it uses two CRDs: + +- Broker containing all the parameters to connect to a remote broker (address, certificates etc.). +- KnownCluster containing the parameters of the newly discovered FLUIDOS node. + +In the LAN case it uses a multicast approach, and for each detected node it creates a KnownCluster CR. +For the WAN case, as a Kubernetes Controller, it monitors the Broker CRs, once a new Broker is applied it will start the messages exchange. As in the multicast approach, KnownCluster CRs are created. diff --git a/docs/implementation/controllers.md b/docs/implementation/controllers.md index b96c924e..506e16fa 100644 --- a/docs/implementation/controllers.md +++ b/docs/implementation/controllers.md @@ -42,3 +42,13 @@ The Reservation controller, tasked with reconciliation on the `Reservation` obje The Allocation controller, tasked with reconciliation on the `Allocation` object, continuously monitors and manages its state to ensure alignment with the desired configuration. (To be implemented) + +## Network Controller (`network_controller.go`) + +The Network controller, tasked with reconciliation on the `Broker` object, continuously monitors and manages its state to ensure alignment with the desired configuration. A struct of type ClientBroker is used as an image of the `Broker`. It follows the following steps: + +1. After a reconcile is triggered checks if the `Broker` object has been deleted. +2. If so it performs the cleaning of the BrokerClient structure to disconnect from the broker gracefully and returns. +3. It checks if another BrokerClient with the same name already exists, if so updates the already existing BrokerClient. +4. If the name does not match, it checks the address, if it matches raises an error and deletes the `Broker`. +5. If no ClientBroker with same address or same name is found, it creates a new BrokerClient with the `Broker` data. diff --git a/docs/implementation/customresources.md b/docs/implementation/customresources.md index 579ffb07..9617a869 100644 --- a/docs/implementation/customresources.md +++ b/docs/implementation/customresources.md @@ -10,6 +10,8 @@ The following custom resources have been developed for the FLUIDOS Node: - [**PeeringCandidate**](./customresources.md#peeringcandidate) - [**Solver**](./customresources.md#solver) - [**Transaction**](./customresources.md#transaction) +- [**Broker**](./customresources.md#broker) +- [**KnownCluster**](./customresources.md#knowncluster) ## Discovery @@ -442,3 +444,48 @@ spec: storage: "0" startTime: "2023-11-16T16:16:44Z" ``` + +## Broker + +Here is a `Broker` sample: + +```yaml +apiVersion: network.fluidos.eu/v1alpha1 +kind: Broker +metadata: + name: brokera + namespace: fluidos +spec: + name: brokera + address: fluidos.top-ix.org + role: both + cacert: + apiVersion: v1 + kind: Secret + metadata: + name: brokera-ca-442 + namespace: fluidos + clcert: + apiVersion: v1 + kind: Secret + metadata: + name: brokera-cl-27348 + namespace: fluidos +``` + +## KnownCluster + +Here is a `KnownCluster` sample: + +```yaml +apiVersion: network.fluidos.eu/v1alpha1 +kind: KnownCluster +metadata: + name: knowncluster-sample + namespace: fluidos +spec: + address: 172.8.0.2:30001 +status: + expirationTime: "2024-01-01T09:00:10Z" + lastUpdateTime: "2024-01-01T09:00:00Z" +``` diff --git a/docs/installation/installation.md b/docs/installation/installation.md index 2139a640..9a596866 100644 --- a/docs/installation/installation.md +++ b/docs/installation/installation.md @@ -89,18 +89,45 @@ Once we have Liqo running, we can install the FLUIDOS Node component via helm: helm repo add fluidos https://fluidos-project.github.io/node/ -helm install node fluidos/node -n fluidos \ ---create-namespace -f ../../quickstart/utils/consumer-values.yaml \ ---set networkManager.configMaps.nodeIdentity.ip="LOCAL_K8S_CLUSTER_CP_IP:LOCAL_REAR_PORT"\ ---set networkManager.configMaps.providers.local="REMOTE_K8S_CLUSTER_CP_IP:REMOTE_REAR_PORT"\ ---wait +helm upgrade --install node fluidos/node \ + -n fluidos --version "$FLUIDOS_VERSION" \ + --create-namespace -f consumer-values.yaml \ + --set networkManager.configMaps.nodeIdentity.ip="$NODE_IP" \ + --set rearController.service.gateway.nodePort.port="$REAR_PORT" \ + --set networkManager.config.enableLocalDiscovery="$ENABLE_LOCAL_DISCOVERY" \ + --set networkManager.config.address.thirdOctet="$THIRD_OCTET" \ + --set networkManager.config.netInterface="$NET_INTERFACE" \ + --wait \ + --debug \ + --v=2 ``` -Due to the absence of the Network Manager component that enable the auto-discovery among FLUIDOS Nodes, we need to setup some parameters manually to allow Nodes discovery. - Here, the meaning of the various parameters: -- LOCAL_K8S_CLUSTER_CP_IP: The IP address of your local Kubernetes cluster Control Plane -- LOCAL_REAR_PORT: The port on which your local cluster uses the REAR protocol -- REMOTE_K8S_CLUSTER_CP_IP: It's the IP address of the remote Kubernetes cluster to which you want to negotiate Flavors thorugh REAR with. -- REMOTE_REAR_PORT: It's the port on which the remote cluster uses the REAR protocol. +- FLUIDOS_VERSION: The FLUIDOS Node version to be installed +- NODE_IP: The IP address of your local Kubernetes cluster Control Plane +- REAR_PORT: The port on which your local cluster uses the REAR protocol +- ENABLE_LOCAL_DISCOVERY: A flag that enables the Network Manager, which is the component advertising the local FLUIDOS Node into a LAN +- THIRD_OCTET: This is the third byte of the IP address used by Multus CNI for sending broadcast messages into the LAN. **Warning**: this parameters should be different for each FLUIDOS Node to be working (e.g. 1 for the 1st cluster, 2 for the 2nd cluster, etc.) +- NET_INTERFACE: The host network interface that Multus binds to + +### Broker CR creation + +To enable the Network Manager to discover FLUIDOS Nodes outside a LAN, you need to configure and apply a Broker CR. +The `broker-creation.sh` script simplifies this process by guiding you through the creation of the Broker YAML file. + +What you need: + +- Kubeconfig PATH + +- Broker's name (custom name of your choice) +- Broker's server address +- Client certificate (.pem) +- Client private key (.pem) +- Broker's Root certificate (.pem) +- Role (publisher ^ subscriber ^ both) + +Two Kubernetes Secrets are created from the certificates and key, one containing the client cert and private key, the other containing the root certificate of the remote broker. +The script will create and apply the .yaml. +To inspect the new Broker: `kubectl describe broker my-broker -n fluidos` +Once applied, the Network Manager Reconcile process starts, enabling message exchange. diff --git a/go.mod b/go.mod index bbf2b748..b046032f 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21.0 require ( github.com/gorilla/mux v1.8.0 github.com/liqotech/liqo v0.9.4 + github.com/rabbitmq/amqp091-go v1.10.0 google.golang.org/grpc v1.59.0-dev k8s.io/api v0.28.2 k8s.io/apimachinery v0.28.2 diff --git a/go.sum b/go.sum index e34ffa15..a250cf5d 100644 --- a/go.sum +++ b/go.sum @@ -121,6 +121,8 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -152,8 +154,8 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= -go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= -go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= diff --git a/pkg/network-manager/broker_client.go b/pkg/network-manager/broker_client.go new file mode 100644 index 00000000..ce3a4f0b --- /dev/null +++ b/pkg/network-manager/broker_client.go @@ -0,0 +1,370 @@ +// Copyright 2022-2025 FLUIDOS Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkmanager + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "encoding/pem" + "fmt" + "strings" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + networkv1alpha1 "github.com/fluidos-project/node/apis/network/v1alpha1" + nodecorev1alpha1 "github.com/fluidos-project/node/apis/nodecore/v1alpha1" + "github.com/fluidos-project/node/pkg/utils/flags" + "github.com/fluidos-project/node/pkg/utils/getters" + "github.com/fluidos-project/node/pkg/utils/namings" + "github.com/fluidos-project/node/pkg/utils/resourceforge" +) + +// clusterRole +// +kubebuilder:rbac:groups=network.fluidos.eu,resources=brokers,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=network.fluidos.eu,resources=brokers/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch +// +kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch + +// BrokerClient keeps all the necessary class data. +type BrokerClient struct { + ID *nodecorev1alpha1.NodeIdentity + ctx context.Context + canc context.CancelFunc + + subFlag bool + pubFlag bool + brokerName string + serverAddr string + clientCert *corev1.Secret + rootCert *corev1.Secret + + brokerConn *brokerConnection +} + +// BrokerConnection keeps all the broker connection data. +type brokerConnection struct { + amqpConn *amqp.Connection + amqpChan *amqp.Channel + exchangeName string + routingKey string + queueName string + inboundMsgs <-chan amqp.Delivery + outboundMsg []byte + confirms chan amqp.Confirmation +} + +// SetupBrokerClient sets the Broker Client from NM reconcile. +func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alpha1.Broker) error { + klog.Info("Setting up Broker Client routines") + + bc.ctx, bc.canc = context.WithCancel(context.Background()) + ctx := bc.ctx + var err error + + bc.ID = getters.GetNodeIdentity(ctx, cl) + if bc.ID == nil { + return fmt.Errorf("failed to get Node Identity") + } + + // Server address and broker name. + bc.brokerName = broker.Spec.Name + bc.serverAddr = broker.Spec.Address + + bc.brokerConn = &brokerConnection{} + bc.brokerConn.exchangeName = "DefaultPeerRequest" + + bc.brokerConn.outboundMsg, err = json.Marshal(bc.ID) + if err != nil { + return err + } + + switch role := broker.Spec.Role; role { + case "publisher": + bc.pubFlag = true + bc.subFlag = false + klog.Infof("brokerClient %s set as publisher only", bc.brokerName) + case "subscriber": + bc.pubFlag = false + bc.subFlag = true + klog.Infof("brokerClient %s set as subscriber only", bc.brokerName) + default: + bc.pubFlag = true + bc.subFlag = true + klog.Infof("brokerClient %s set as publisher and subscriber", bc.brokerName) + } + + // Certificates. + bc.clientCert = &corev1.Secret{} + bc.rootCert = &corev1.Secret{} + + klog.Infof("Root Secret Name: %s\n", broker.Spec.CaCert.Name) + klog.Infof("Client Secret Name: %s\n", broker.Spec.ClCert.Name) + secretNamespace := "fluidos" + + err = bc.extractSecret(cl, broker.Spec.ClCert.Name, secretNamespace, bc.clientCert) + if err != nil { + return err + } + err = bc.extractSecret(cl, broker.Spec.CaCert.Name, secretNamespace, bc.rootCert) + if err != nil { + return err + } + + // Extract certs and key. + clientCert, ok := bc.clientCert.Data["tls.crt"] + if !ok { + klog.Error("missing certificate: 'tls.crt' not found in clCert Data") + } + + clientKey, ok := bc.clientCert.Data["tls.key"] + if !ok { + klog.Error("missing key: 'tls.key' not found in clCert Data") + } + + caCertData, ok := bc.rootCert.Data["CA_cert.pem"] + if !ok { + klog.Error("missing certificate: 'tls.crt' not found in CACert Data") + } + + // Load client cert and privKey. + cert, err := tls.X509KeyPair(clientCert, clientKey) + if err != nil { + klog.Error("error X509KeyPair: %v", err) + return err + } + + // Load root cert. + caCertPool := x509.NewCertPool() + ok = caCertPool.AppendCertsFromPEM(caCertData) + if !ok { + klog.Error("AppendCertsFromPEM error: %v", ok) + } + + // Routing key for topic. + bc.brokerConn.routingKey, err = extractCNfromCert(&clientCert) + if err != nil { + klog.Error("Common Name extraction error: %v", err) + } + bc.brokerConn.queueName = bc.brokerConn.routingKey + + // TLS config. + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + ServerName: bc.serverAddr, + MinVersion: tls.VersionTLS12, + } + + err = bc.brokerConnectionConfig(tlsConfig) + + return err +} + +// ExecuteBrokerClient executes the Network Manager Broker routines. +func (bc *BrokerClient) ExecuteBrokerClient(cl client.Client) error { + // Start sending messages + klog.Info("executing broker client routines") + var err error + if bc.pubFlag { + go func() { + bc.publishOnBroker() + }() + } + + // Start receiving messages + if bc.subFlag { + go func() { + if err = bc.readMsgOnBroker(bc.ctx, cl); err != nil { + klog.ErrorS(err, "error receiving advertisement") + } + }() + } + return err +} + +func (bc *BrokerClient) publishOnBroker() { + ticker := time.NewTicker(10 * time.Second) + for { + select { + case <-ticker.C: + + // Pub on exchange + err := bc.brokerConn.amqpChan.Publish( + bc.brokerConn.exchangeName, + bc.brokerConn.routingKey, + true, // Mandatory: if not routable -> error + false, // Immediate + amqp.Publishing{ + ContentType: "application/json", + Body: bc.brokerConn.outboundMsg, + Expiration: "30000", // TTL ms + }) + if err != nil { + klog.Error("Error pub message: %v", err) + } + + select { + case confirm := <-bc.brokerConn.confirms: + if confirm.Ack { + klog.Info("Message successfully published!") + } else { + klog.Info("Message failed to publish!") + } + case <-time.After(5 * time.Second): // Timeout + klog.Info("No confirmation received, message status unknown.") + } + + case <-bc.ctx.Done(): + ticker.Stop() + klog.Info("Ticker stopped\n") + return + } + } +} + +func (bc *BrokerClient) readMsgOnBroker(ctx context.Context, cl client.Client) error { + klog.Info("Listening from Broker") + for d := range bc.brokerConn.inboundMsgs { + klog.Info("Received remote advertisement from BROKER\n") + var remote NetworkManager + err := json.Unmarshal(d.Body, &remote.ID) + if err != nil { + klog.Error("Error unmarshalling message: ", err) + continue + } + // Check if received advertisement is remote + if bc.ID.IP != remote.ID.IP { + // Create knownCluster CR + kc := &networkv1alpha1.KnownCluster{} + + if err := cl.Get(ctx, client.ObjectKey{Name: namings.ForgeKnownClusterName(remote.ID.NodeID), Namespace: flags.FluidosNamespace}, kc); err != nil { + if client.IgnoreNotFound(err) == nil { + klog.InfoS("KnownCluster not found: creating form Broker", remote.ID.NodeID) + + // Create new KnownCluster CR + if err := cl.Create(ctx, resourceforge.ForgeKnownCluster(remote.ID.NodeID, remote.ID.IP)); err != nil { + return err + } + klog.InfoS("KnownCluster created from Broker", "ID", remote.ID.NodeID) + } + } else { + klog.Info("KnownCluster already present: updating from Broker", remote.ID.NodeID) + kc.UpdateStatus() + + // Update fetched KnownCluster CR + err := cl.Status().Update(ctx, kc) + if err != nil { + return err + } + klog.InfoS("KnownCluster updated from Broker", "ID", kc.ObjectMeta.Name) + } + } + } + return nil +} + +func extractCNfromCert(certPEM *[]byte) (string, error) { + var err error + var cert *x509.Certificate + var CN = "" + + // Decode PEM cert + block, _ := pem.Decode(*certPEM) + if block == nil { + klog.Error("Error decoding certificate PEM in CN extraction") + } else { + // Parsing X.509 + cert, err = x509.ParseCertificate(block.Bytes) + if err != nil { + klog.Error("Error parsing certificate X.509 in CN extraction: %v", err) + } else { + CN = cert.Subject.CommonName + } + } + return strings.TrimSpace(CN), err +} + +func (bc *BrokerClient) brokerConnectionConfig(tlsConfig *tls.Config) error { + var err error + config := amqp.Config{ + SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, // auth EXTERNAL + TLSClientConfig: tlsConfig, // config TLS + Vhost: "/", // vhost + Heartbeat: 10 * time.Second, // heartbeat + } + + // Config connection + serverURL := "amqps://" + bc.serverAddr + ":5671/" + + bc.brokerConn.amqpConn, err = amqp.DialConfig(serverURL, config) + if err != nil { + klog.Error("RabbitMQ connection error: %v", err) + return err + } + + // Channel creation + bc.brokerConn.amqpChan, err = bc.brokerConn.amqpConn.Channel() + if err != nil { + klog.Error("channel creation error: %v", err) + return err + } + + // Queue subscrition + bc.brokerConn.inboundMsgs, err = bc.brokerConn.amqpChan.Consume( + bc.brokerConn.queueName, // queue name + "", // consumer name (empty -> generated) + true, // AutoAck + false, // Exclusive: queue is accessible only from this consumer + true, // false, // NoLocal: does not receive selfpublished messages + false, // NoWait: server confirmation + nil, // Arguments + ) + if err != nil { + klog.Error("Error subscribing queue: %s", err) + return err + } + + // Write confirm broker + if err := bc.brokerConn.amqpChan.Confirm(false); err != nil { + klog.Error("Failed to enable publisher confirms: %v", err) + return err + } + + // Channels for write confirm + bc.brokerConn.confirms = bc.brokerConn.amqpChan.NotifyPublish(make(chan amqp.Confirmation, 1)) + + klog.InfoS("Node", "ID", bc.ID.NodeID, "Client Address", bc.ID.IP, "Server Address", bc.serverAddr, "RoutingKey", bc.brokerConn.routingKey) + + return nil +} + +func (bc *BrokerClient) extractSecret(cl client.Client, secretName, secretNamespace string, secretDest *corev1.Secret) error { + err := cl.Get(context.TODO(), client.ObjectKey{ + Name: secretName, + Namespace: secretNamespace, + }, secretDest) + if err != nil { + klog.Error("Error retrieving Secret: %v\n", err) + return err + } + return nil +} diff --git a/pkg/network-manager/network-controller.go b/pkg/network-manager/network-controller.go index ebc6a541..539d35df 100644 --- a/pkg/network-manager/network-controller.go +++ b/pkg/network-manager/network-controller.go @@ -1,4 +1,4 @@ -// Copyright 2022-2024 FLUIDOS Project +// Copyright 2022-2025 FLUIDOS Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -41,6 +41,7 @@ import ( // +kubebuilder:rbac:groups=network.fluidos.eu,resources=knownclusters/status,verbs=get;update;patch // +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch // +kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;delete // NetworkManager keeps all the necessary class data. type NetworkManager struct { @@ -50,45 +51,104 @@ type NetworkManager struct { EnableLocalDiscovery bool } -// KnownClusterReconciler reconciles a KnownCluster object. -type KnownClusterReconciler struct { +// BrokerReconciler reconciles a Broker object. +type BrokerReconciler struct { client.Client - Scheme *runtime.Scheme + Scheme *runtime.Scheme + ActiveBrokers []*BrokerClient } -// Reconcile reconciles a KnownClusters from DiscoveredClustersList. -func (r *KnownClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := ctrl.LoggerFrom(ctx, "kowncluster", req.NamespacedName) +// Reconcile reconciles a Broker. +func (r *BrokerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + klog.InfoS("Reconcile triggered") + log := ctrl.LoggerFrom(ctx, "broker", req.NamespacedName) ctx = ctrl.LoggerInto(ctx, log) - klog.InfoS("Reconcile triggered", "context", ctx) + var broker networkv1alpha1.Broker + + if err := r.Get(ctx, req.NamespacedName, &broker); client.IgnoreNotFound(err) != nil { + klog.Errorf("Error when getting Broker %s before reconcile: %v", req.NamespacedName, err) + return ctrl.Result{}, err + } else if err != nil { + klog.Infof("Broker %s not found, probably deleted", req.NamespacedName) + + // Deleting BrokerClient + for i, brokerCl := range r.ActiveBrokers { + if brokerCl.brokerName == req.NamespacedName.Name { + klog.Infof("Deleting %s ", brokerCl.brokerName) + err = r.brokerDelete(brokerCl, i) + for i := range r.ActiveBrokers { + klog.Info(r.ActiveBrokers[i]) + } + if err != nil { + return ctrl.Result{}, err + } + } + } + return ctrl.Result{}, nil + } + + // If found in CR && found in slice -> update + found := false + sameAddr := false + for i, brokerCl := range r.ActiveBrokers { + if brokerCl.brokerName == broker.Spec.Name { + klog.Info("found brokerClient ", brokerCl.brokerName) + klog.Info("and Broker ", broker.Spec.Name) + found = true + + // Update + if err := r.brokerUpdate(&broker, brokerCl, i); err != nil { + klog.Error("brokerUpdate failed: %s", err) + if err = cleanBroker(ctx, r.Client, &broker); err != nil { + klog.Error("cleanBroker failed: %s", err) + } + return ctrl.Result{}, err + } + break + } else if brokerCl.serverAddr == broker.Spec.Address { + sameAddr = true + klog.Error("brokerUpdate failed: same address found in another BrokerClient") + if err := cleanBroker(ctx, r.Client, &broker); err != nil { + klog.Error("cleanBroker failed: %s", err) + } + } + } + // If found in CR && !found in slice && address is free -> create + if (!found) && (!sameAddr) { + // Create + if err := r.brokerCreate(&broker); err != nil { + klog.Error("brokerCreate failed: %s", err) + if err = cleanBroker(ctx, r.Client, &broker); err != nil { + klog.Error("cleanBroker failed: %s", err) + } + return ctrl.Result{}, err + } + } + klog.Infof("Reconciling Broker %s", req.NamespacedName) return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. -func (r *KnownClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *BrokerReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&networkv1alpha1.KnownCluster{}). + For(&networkv1alpha1.Broker{}). Complete(r) } // Setup the Network Manager. func Setup(ctx context.Context, cl client.Client, nm *NetworkManager, cniInterface *string) error { klog.Info("Setting up Network Manager routines") - nodeIdentity := getters.GetNodeIdentity(ctx, cl) if nodeIdentity == nil { return fmt.Errorf("failed to get Node Identity") } - multicastAddress := os.Getenv("MULTICAST_ADDRESS") if multicastAddress == "" { return fmt.Errorf("failed to get multicast address") } - nm.ID = nodeIdentity nm.Multicast = multicastAddress - if nm.EnableLocalDiscovery { ifi, err := net.InterfaceByName(*cniInterface) if err != nil { @@ -97,9 +157,7 @@ func Setup(ctx context.Context, cl client.Client, nm *NetworkManager, cniInterfa nm.Iface = ifi klog.InfoS("Interface", "Name", ifi.Name, "MAC address", ifi.HardwareAddr) } - klog.InfoS("Node", "ID", nodeIdentity.NodeID, "Address", nodeIdentity.IP) - return nil } @@ -112,7 +170,6 @@ func Execute(ctx context.Context, cl client.Client, nm *NetworkManager) error { klog.ErrorS(err, "Error sending advertisemente") } }() - // Start receiving multicast messages go func() { if err := receiveMulticastMessage(ctx, cl, nm); err != nil { @@ -120,14 +177,12 @@ func Execute(ctx context.Context, cl client.Client, nm *NetworkManager) error { } }() } - // Do housekeeping go func() { if err := doHousekeeping(ctx, cl); err != nil { klog.ErrorS(err, "Error doing housekeeping") } }() - return nil } @@ -136,25 +191,21 @@ func sendMulticastMessage(ctx context.Context, nm *NetworkManager) error { if err != nil { return err } - laddr, err := nm.Iface.Addrs() if err != nil { return err } - dialer := &net.Dialer{ LocalAddr: &net.UDPAddr{ IP: laddr[0].(*net.IPNet).IP, Port: 0, }, } - conn, err := dialer.Dial("udp", nm.Multicast) if err != nil { return err } defer conn.Close() - ticker := time.NewTicker(5 * time.Second) for { select { @@ -182,7 +233,6 @@ func receiveMulticastMessage(ctx context.Context, cl client.Client, local *Netwo return err } defer conn.Close() - buffer := make([]byte, 1024) for { @@ -190,9 +240,7 @@ func receiveMulticastMessage(ctx context.Context, cl client.Client, local *Netwo if err != nil { return err } - var remote NetworkManager - err = json.Unmarshal(buffer[:n], &remote.ID) if err != nil { klog.Error("Error unmarshalling message: ", err) @@ -267,3 +315,54 @@ func doHousekeeping(ctx context.Context, cl client.Client) error { } } } + +// Update the clientBroker. +func (r *BrokerReconciler) brokerUpdate(broker *networkv1alpha1.Broker, brokerCl *BrokerClient, index int) error { + klog.Infof("updating broker: %s", brokerCl.brokerName) + if err := r.brokerDelete(brokerCl, index); err != nil { + return err + } + if err := r.brokerCreate(broker); err != nil { + return err + } + return nil +} + +// Create the clientBroker. +func (r *BrokerReconciler) brokerCreate(broker *networkv1alpha1.Broker) error { + var bc BrokerClient + var err error + if err = bc.SetupBrokerClient(r.Client, broker); err != nil { + return err + } + if err = bc.ExecuteBrokerClient(r.Client); err != nil { + return err + } + r.ActiveBrokers = append(r.ActiveBrokers, &bc) + return nil +} + +// Delete the clientBroker. +func (r *BrokerReconciler) brokerDelete(brokerCl *BrokerClient, index int) error { + if err := brokerCl.brokerConn.amqpChan.Close(); err != nil { + klog.Errorf("Failed to close channel for broker '%s': %v", brokerCl.brokerName, err) + return err + } + if err := brokerCl.brokerConn.amqpConn.Close(); err != nil { + klog.Errorf("Failed to close connection for broker '%s': %v", brokerCl.brokerName, err) + return err + } + brokerCl.canc() + r.ActiveBrokers = append(r.ActiveBrokers[:index], r.ActiveBrokers[index+1:]...) + return nil +} + +// Delete the Broker CR. +func cleanBroker(ctx context.Context, cl client.Client, broker *networkv1alpha1.Broker) error { + err := cl.Delete(ctx, broker) + if err != nil { + klog.Error("error during Broker deletion '%s': %w", broker.Spec.Name, err) + return err + } + return nil +} diff --git a/quickstart/utils/consumer-values-no-ad.yaml b/quickstart/utils/consumer-values-no-ad.yaml index bbb62840..981e2d44 100644 --- a/quickstart/utils/consumer-values-no-ad.yaml +++ b/quickstart/utils/consumer-values-no-ad.yaml @@ -144,7 +144,7 @@ networkManager: limits: {} requests: {} # -- The resource image to be used by the network-manager pod. - imageName: "ghcr.io/fluidos/network-manager" + imageName: "ghcr.io/fluidos-project/network-manager" config: enableLocalDiscovery: address: @@ -157,6 +157,7 @@ networkManager: multicast: address: "239.11.11.1" port: 4000 + netInterface: "eth0" webhook: # -- Enable the webhook server for the local-resource-manager. diff --git a/quickstart/utils/consumer-values.yaml b/quickstart/utils/consumer-values.yaml index e8cc9727..b33ad1b0 100644 --- a/quickstart/utils/consumer-values.yaml +++ b/quickstart/utils/consumer-values.yaml @@ -143,7 +143,7 @@ networkManager: limits: {} requests: {} # -- The resource image to be used by the network-manager pod. - imageName: "ghcr.io/fluidos/network-manager" + imageName: "ghcr.io/fluidos-project/network-manager" config: enableLocalDiscovery: address: @@ -156,6 +156,7 @@ networkManager: multicast: address: "239.11.11.1" port: 4000 + netInterface: "eth0" webhook: # -- Enable the webhook server for the local-resource-manager. diff --git a/quickstart/utils/provider-values-no-ad.yaml b/quickstart/utils/provider-values-no-ad.yaml index dd6b04c0..80212692 100644 --- a/quickstart/utils/provider-values-no-ad.yaml +++ b/quickstart/utils/provider-values-no-ad.yaml @@ -143,7 +143,7 @@ networkManager: limits: {} requests: {} # -- The resource image to be used by the network-manager pod. - imageName: "ghcr.io/fluidos/network-manager" + imageName: "ghcr.io/fluidos-project/network-manager" config: enableLocalDiscovery: address: @@ -156,6 +156,7 @@ networkManager: multicast: address: "239.11.11.1" port: 4000 + netInterface: "eth0" webhook: # -- Enable the webhook server for the local-resource-manager. diff --git a/quickstart/utils/provider-values.yaml b/quickstart/utils/provider-values.yaml index ff660679..3c6244a2 100644 --- a/quickstart/utils/provider-values.yaml +++ b/quickstart/utils/provider-values.yaml @@ -143,7 +143,7 @@ networkManager: limits: {} requests: {} # -- The resource image to be used by the network-manager pod. - imageName: "ghcr.io/fluidos/network-manager" + imageName: "ghcr.io/fluidos-project/network-manager" config: enableLocalDiscovery: address: @@ -156,6 +156,7 @@ networkManager: multicast: address: "239.11.11.1" port: 4000 + netInterface: "eth0" webhook: # -- Enable the webhook server for the local-resource-manager. diff --git a/tools/scripts/broker-creation.sh b/tools/scripts/broker-creation.sh new file mode 100755 index 00000000..af7598dc --- /dev/null +++ b/tools/scripts/broker-creation.sh @@ -0,0 +1,75 @@ +#!/usr/bin/bash + +read_input() { + local prompt="$1" + local var_name="$2" + + read -p "$prompt: " -r value + eval "$var_name=\"$value\"" +} + +if [[ -n "${KUBECONFIG}" ]]; then + kubeconfig=$KUBECONFIG +else read_input "KUBECONFIG not set, please set it" kubeconfig +fi + +broker_name="null" +address="null" +broker_ca_cert="null" +broker_client_cert="null" +broker_priv_key="null" +role="null" + +read_input "Broker's name of your choice" "broker_name" +read_input "Broker server address (must match certificate CN)" "address" +read_input ".pem ROOT certificate" "broker_ca_cert" +read_input ".pem client certificate" "broker_client_cert" +read_input ".pem private key" "broker_priv_key" +read_input "Type the role: publisher | subscriber | both" "role" + +broker_ca_secret="$broker_name"-ca-"$RANDOM" +broker_client_secret="$broker_name"-cl-"$RANDOM" + +#create the secrets +kubectl create secret tls $broker_client_secret --cert=$broker_client_cert --key=$broker_priv_key --namespace=fluidos --kubeconfig "$kubeconfig" +status=$? +if [ "$status" -ne 0 ]; then + exit 1 +fi + +kubectl create secret generic $broker_ca_secret --from-file=$broker_ca_cert --namespace=fluidos --kubeconfig "$kubeconfig" +status=$? +if [ "$status" -ne 0 ]; then + kubectl delete secret broker_client_secret -n fluidos + exit 1 +fi + +# cr yaml +cat < ./$broker_name.yaml +apiVersion: network.fluidos.eu/v1alpha1 +kind: Broker +metadata: + name: $broker_name + namespace: fluidos +spec: + name: $broker_name + address: $address + role: $role + cacert: + apiVersion: v1 + kind: Secret + metadata: + name: $broker_ca_secret + namespace: fluidos + clcert: + apiVersion: v1 + kind: Secret + metadata: + name: $broker_client_secret + namespace: fluidos +EOF + +if [ -f "$broker_name.yaml" ]; then + kubectl apply -f $broker_name.yaml + rm -f "$broker_name.yaml" +fi