diff --git a/cmd/root/root.go b/cmd/root/root.go index 4bd1d7601..73af4dc18 100644 --- a/cmd/root/root.go +++ b/cmd/root/root.go @@ -38,7 +38,6 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/manager" "github.com/apache/apisix-ingress-controller/internal/version" - "github.com/api7/gopkg/pkg/log" ) type GatewayConfigsFlag struct { @@ -115,6 +114,7 @@ func newAPISIXIngressController() *cobra.Command { return err } +<<<<<<< HEAD // dashboard sdk log l, err := log.NewLogger( log.WithOutputFile("stderr"), @@ -126,6 +126,8 @@ func newAPISIXIngressController() *cobra.Command { } log.DefaultLogger = l +======= +>>>>>>> d9550d88 (chore: unify the logging component (#2584)) // controllers log core := zapcore.NewCore( zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), diff --git a/internal/adc/cache/store.go b/internal/adc/cache/store.go index 70152266c..6c5a1f239 100644 --- a/internal/adc/cache/store.go +++ b/internal/adc/cache/store.go @@ -21,9 +21,8 @@ import ( "fmt" "sync" - "github.com/api7/gopkg/pkg/log" + "github.com/go-logr/logr" "github.com/google/uuid" - "go.uber.org/zap" adctypes "github.com/apache/apisix-ingress-controller/api/adc" "github.com/apache/apisix-ingress-controller/internal/controller/label" @@ -34,12 +33,14 @@ type Store struct { pluginMetadataMap map[string]adctypes.PluginMetadata sync.Mutex + log logr.Logger } -func NewStore() *Store { +func NewStore(log logr.Logger) *Store { return &Store{ cacheMap: make(map[string]Cache), pluginMetadataMap: make(map[string]adctypes.PluginMetadata), + log: log, } } @@ -55,7 +56,7 @@ func (s *Store) Insert(name string, resourceTypes []string, resources *adctypes. s.cacheMap[name] = db targetCache = s.cacheMap[name] } - log.Debugw("Inserting resources into cache for", zap.String("name", name)) + s.log.V(1).Info("Inserting resources into cache", "name", name, "resourceTypes", resourceTypes, "Labels", Labels) selector := &KindLabelSelector{ Kind: Labels[label.LabelKind], Name: Labels[label.LabelName], @@ -162,41 +163,41 @@ func (s *Store) Delete(name string, resourceTypes []string, Labels map[string]st case adctypes.TypeService: services, err := targetCache.ListServices(selector) if err != nil { - log.Errorw("failed to list services", zap.Error(err)) + s.log.Error(err, "failed to list services") } for _, service := range services { if err := targetCache.DeleteService(service); err != nil { - log.Errorw("failed to delete service", zap.Error(err), zap.String("service", service.ID)) + s.log.Error(err, "failed to delete service", "service", service.ID) } } case adctypes.TypeSSL: ssls, err := targetCache.ListSSL(selector) if err != nil { - log.Errorw("failed to list ssl", zap.Error(err)) + s.log.Error(err, "failed to list ssl") } for _, ssl := range ssls { if err := targetCache.DeleteSSL(ssl); err != nil { - log.Errorw("failed to delete ssl", zap.Error(err), zap.String("ssl", ssl.ID)) + s.log.Error(err, "failed to delete ssl", "ssl", ssl.ID) } } case adctypes.TypeConsumer: consumers, err := targetCache.ListConsumers(selector) if err != nil { - log.Errorw("failed to list consumers", zap.Error(err)) + s.log.Error(err, "failed to list consumers") } for _, consumer := range consumers { if err := targetCache.DeleteConsumer(consumer); err != nil { - log.Errorw("failed to delete consumer", zap.Error(err), zap.String("consumer", consumer.Username)) + s.log.Error(err, "failed to delete consumer", "consumer", consumer.Username) } } case adctypes.TypeGlobalRule: globalRules, err := targetCache.ListGlobalRules(selector) if err != nil { - log.Errorw("failed to list global rules", zap.Error(err)) + s.log.Error(err, "failed to list global rules") } for _, globalRule := range globalRules { if err := targetCache.DeleteGlobalRule(globalRule); err != nil { - log.Errorw("failed to delete global rule", zap.Error(err), zap.String("global rule", globalRule.ID)) + s.log.Error(err, "failed to delete global rule", "global rule", globalRule.ID) } } case adctypes.TypePluginMetadata: @@ -229,7 +230,7 @@ func (s *Store) GetResources(name string) (*adctypes.Resources, error) { } globalrule = adctypes.GlobalRule(merged) } - log.Debugw("get resources global rule items", zap.Any("globalRuleItems", globalRuleItems)) + s.log.V(1).Info("GetResources fetched global rule items", "items", globalRuleItems, "gobalrule", globalrule) if meta, ok := s.pluginMetadataMap[name]; ok { metadata = meta.DeepCopy() } diff --git a/internal/adc/client/client.go b/internal/adc/client/client.go index 8efb0610b..aa961853a 100644 --- a/internal/adc/client/client.go +++ b/internal/adc/client/client.go @@ -27,9 +27,8 @@ import ( "sync" "time" - "github.com/api7/gopkg/pkg/log" + "github.com/go-logr/logr" "github.com/pkg/errors" - "go.uber.org/zap" adctypes "github.com/apache/apisix-ingress-controller/api/adc" "github.com/apache/apisix-ingress-controller/internal/adc/cache" @@ -48,23 +47,33 @@ type Client struct { ConfigManager *common.ConfigManager[types.NamespacedNameKind, adctypes.Config] ADCDebugProvider *common.ADCDebugProvider + + log logr.Logger } -func New(mode string, timeout time.Duration) (*Client, error) { +func New(log logr.Logger, mode string, timeout time.Duration) (*Client, error) { serverURL := os.Getenv("ADC_SERVER_URL") if serverURL == "" { serverURL = defaultHTTPADCExecutorAddr } +<<<<<<< HEAD store := cache.NewStore() +======= + store := cache.NewStore(log) +>>>>>>> d9550d88 (chore: unify the logging component (#2584)) configManager := common.NewConfigManager[types.NamespacedNameKind, adctypes.Config]() - log.Infow("using HTTP ADC Executor", zap.String("server_url", serverURL)) + + logger := log.WithName("client") + logger.Info("ADC client initialized", "mode", mode) + return &Client{ Store: store, - executor: NewHTTPADCExecutor(serverURL, timeout), + executor: NewHTTPADCExecutor(log, serverURL, timeout), BackendMode: mode, ConfigManager: configManager, ADCDebugProvider: common.NewADCDebugProvider(store, configManager), + log: logger, }, nil } @@ -82,31 +91,31 @@ type StoreDelta struct { Applied map[types.NamespacedNameKind]adctypes.Config } -func (d *Client) applyStoreChanges(args Task, isDelete bool) (StoreDelta, error) { - d.mu.Lock() - defer d.mu.Unlock() +func (c *Client) applyStoreChanges(args Task, isDelete bool) (StoreDelta, error) { + c.mu.Lock() + defer c.mu.Unlock() var delta StoreDelta if isDelete { - delta.Deleted = d.ConfigManager.Get(args.Key) - d.ConfigManager.Delete(args.Key) + delta.Deleted = c.ConfigManager.Get(args.Key) + c.ConfigManager.Delete(args.Key) } else { - deleted := d.ConfigManager.Update(args.Key, args.Configs) + deleted := c.ConfigManager.Update(args.Key, args.Configs) delta.Deleted = deleted delta.Applied = args.Configs } for _, cfg := range delta.Deleted { - if err := d.Store.Delete(cfg.Name, args.ResourceTypes, args.Labels); err != nil { - log.Errorw("store delete failed", zap.Error(err), zap.Any("cfg", cfg), zap.Any("args", args)) + if err := c.Store.Delete(cfg.Name, args.ResourceTypes, args.Labels); err != nil { + c.log.Error(err, "store delete failed", "cfg", cfg, "args", args) return StoreDelta{}, errors.Wrap(err, fmt.Sprintf("store delete failed for config %s", cfg.Name)) } } for _, cfg := range delta.Applied { - if err := d.Insert(cfg.Name, args.ResourceTypes, args.Resources, args.Labels); err != nil { - log.Errorw("store insert failed", zap.Error(err), zap.Any("cfg", cfg), zap.Any("args", args)) + if err := c.Insert(cfg.Name, args.ResourceTypes, args.Resources, args.Labels); err != nil { + c.log.Error(err, "store insert failed", "cfg", cfg, "args", args) return StoreDelta{}, errors.Wrap(err, fmt.Sprintf("store insert failed for config %s", cfg.Name)) } } @@ -114,23 +123,23 @@ func (d *Client) applyStoreChanges(args Task, isDelete bool) (StoreDelta, error) return delta, nil } -func (d *Client) applySync(ctx context.Context, args Task, delta StoreDelta) error { - d.syncMu.RLock() - defer d.syncMu.RUnlock() +func (c *Client) applySync(ctx context.Context, args Task, delta StoreDelta) error { + c.syncMu.RLock() + defer c.syncMu.RUnlock() if len(delta.Deleted) > 0 { - if err := d.sync(ctx, Task{ + if err := c.sync(ctx, Task{ Name: args.Name, Labels: args.Labels, ResourceTypes: args.ResourceTypes, Configs: delta.Deleted, }); err != nil { - log.Warnw("failed to sync deleted configs", zap.Error(err)) + c.log.Error(err, "failed to sync deleted configs", "args", args, "delta", delta) } } if len(delta.Applied) > 0 { - return d.sync(ctx, Task{ + return c.sync(ctx, Task{ Name: args.Name, Labels: args.Labels, ResourceTypes: args.ResourceTypes, @@ -141,45 +150,45 @@ func (d *Client) applySync(ctx context.Context, args Task, delta StoreDelta) err return nil } -func (d *Client) Update(ctx context.Context, args Task) error { - delta, err := d.applyStoreChanges(args, false) +func (c *Client) Update(ctx context.Context, args Task) error { + delta, err := c.applyStoreChanges(args, false) if err != nil { return err } - return d.applySync(ctx, args, delta) + return c.applySync(ctx, args, delta) } -func (d *Client) UpdateConfig(ctx context.Context, args Task) error { - _, err := d.applyStoreChanges(args, false) +func (c *Client) UpdateConfig(ctx context.Context, args Task) error { + _, err := c.applyStoreChanges(args, false) return err } -func (d *Client) Delete(ctx context.Context, args Task) error { - delta, err := d.applyStoreChanges(args, true) +func (c *Client) Delete(ctx context.Context, args Task) error { + delta, err := c.applyStoreChanges(args, true) if err != nil { return err } - return d.applySync(ctx, args, delta) + return c.applySync(ctx, args, delta) } -func (d *Client) DeleteConfig(ctx context.Context, args Task) error { - _, err := d.applyStoreChanges(args, true) +func (c *Client) DeleteConfig(ctx context.Context, args Task) error { + _, err := c.applyStoreChanges(args, true) return err } func (c *Client) Sync(ctx context.Context) (map[string]types.ADCExecutionErrors, error) { c.syncMu.Lock() defer c.syncMu.Unlock() - log.Debug("syncing all resources") + c.log.Info("syncing all resources") configs := c.ConfigManager.List() if len(configs) == 0 { - log.Warn("no GatewayProxy configs provided") + c.log.Info("no GatewayProxy configs provided") return nil, nil } - log.Debugw("syncing resources with multiple configs", zap.Any("configs", configs)) + c.log.V(1).Info("syncing resources with multiple configs", "configs", configs) failedMap := map[string]types.ADCExecutionErrors{} var failedConfigs []string @@ -187,7 +196,7 @@ func (c *Client) Sync(ctx context.Context) (map[string]types.ADCExecutionErrors, name := config.Name resources, err := c.GetResources(name) if err != nil { - log.Errorw("failed to get resources from store", zap.String("name", name), zap.Error(err)) + c.log.Error(err, "failed to get resources from store", "name", name) failedConfigs = append(failedConfigs, name) continue } @@ -202,7 +211,7 @@ func (c *Client) Sync(ctx context.Context) (map[string]types.ADCExecutionErrors, }, Resources: resources, }); err != nil { - log.Errorw("failed to sync resources", zap.String("name", name), zap.Error(err)) + c.log.Error(err, "failed to sync resources", "name", name) failedConfigs = append(failedConfigs, name) var execErrs types.ADCExecutionErrors if errors.As(err, &execErrs) { @@ -221,10 +230,10 @@ func (c *Client) Sync(ctx context.Context) (map[string]types.ADCExecutionErrors, } func (c *Client) sync(ctx context.Context, task Task) error { - log.Debugw("syncing resources", zap.Any("task", task)) + c.log.V(1).Info("syncing resources", "task", task) if len(task.Configs) == 0 { - log.Warnw("no adc configs provided", zap.Any("task", task)) + c.log.Info("no adc configs provided") return nil } @@ -309,6 +318,7 @@ func (c *Client) sync(ctx context.Context, task Task) error { } pkgmetrics.RecordFileIODuration("prepare_sync_file", adctypes.StatusSuccess, time.Since(fileIOStart).Seconds()) defer cleanup() + c.log.V(1).Info("prepared sync file", "path", syncFilePath) args := BuildADCExecuteArgs(syncFilePath, task.Labels, task.ResourceTypes) @@ -326,7 +336,7 @@ func (c *Client) sync(ctx context.Context, task Task) error { status := adctypes.StatusSuccess if err != nil { status = "failure" - log.Errorw("failed to execute adc command", zap.Error(err), zap.Any("config", config)) + c.log.Error(err, "failed to execute adc command", "config", config) var execErr types.ADCExecutionError if errors.As(err, &execErr) { @@ -366,7 +376,5 @@ func prepareSyncFile(resources any) (string, func(), error) { return "", nil, err } - log.Debugw("generated adc file", zap.String("filename", tmpFile.Name()), zap.String("json", string(data))) - return tmpFile.Name(), cleanup, nil } diff --git a/internal/adc/client/executor.go b/internal/adc/client/executor.go index c5e16b543..0a28a6795 100644 --- a/internal/adc/client/executor.go +++ b/internal/adc/client/executor.go @@ -31,8 +31,7 @@ import ( "sync" "time" - "github.com/api7/gopkg/pkg/log" - "go.uber.org/zap" + "github.com/go-logr/logr" "k8s.io/utils/ptr" adctypes "github.com/apache/apisix-ingress-controller/api/adc" @@ -49,6 +48,7 @@ type ADCExecutor interface { type DefaultADCExecutor struct { sync.Mutex + log logr.Logger } func (e *DefaultADCExecutor) Execute(ctx context.Context, mode string, config adctypes.Config, args []string) error { @@ -62,7 +62,7 @@ func (e *DefaultADCExecutor) runADC(ctx context.Context, mode string, config adc for _, addr := range config.ServerAddrs { if err := e.runForSingleServerWithTimeout(ctx, addr, mode, config, args); err != nil { - log.Errorw("failed to run adc for server", zap.String("server", addr), zap.Error(err)) + e.log.Error(err, "failed to run adc for server", "server", addr) var execErr types.ADCExecutionServerAddrError if errors.As(err, &execErr) { execErrs.FailedErrors = append(execErrs.FailedErrors, execErr) @@ -102,9 +102,9 @@ func (e *DefaultADCExecutor) runForSingleServer(ctx context.Context, serverAddr, cmd.Stderr = &stderr cmd.Env = append(os.Environ(), env...) - log.Debugw("running adc command", - zap.String("command", strings.Join(cmd.Args, " ")), - zap.Strings("env", filterSensitiveEnv(env)), + e.log.V(1).Info("running adc command", + "command", strings.Join(cmd.Args, " "), + "env", filterSensitiveEnv(env), ) if err := cmd.Run(); err != nil { @@ -113,22 +113,21 @@ func (e *DefaultADCExecutor) runForSingleServer(ctx context.Context, serverAddr, result, err := e.handleOutput(stdout.Bytes()) if err != nil { - log.Errorw("failed to handle adc output", - zap.Error(err), - zap.String("stdout", stdout.String()), - zap.String("stderr", stderr.String()), - ) + e.log.Error(err, "failed to handle adc output", + "stdout", stdout.String(), + "stderr", stderr.String()) return fmt.Errorf("failed to handle adc output: %w", err) } if result.FailedCount > 0 && len(result.Failed) > 0 { - log.Errorw("adc sync failed", zap.Any("result", result)) + reason := result.Failed[0].Reason + e.log.Error(fmt.Errorf("adc sync failed: %s", reason), "adc sync failed", "result", result) return types.ADCExecutionServerAddrError{ ServerAddr: serverAddr, - Err: result.Failed[0].Reason, + Err: reason, FailedStatuses: result.Failed, } } - log.Debugw("adc sync success", zap.Any("result", result)) + e.log.V(1).Info("adc sync success", "result", result) return nil } @@ -160,28 +159,19 @@ func (e *DefaultADCExecutor) buildCmdError(runErr error, stdout, stderr []byte) if errMsg == "" { errMsg = string(stdout) } - log.Errorw("failed to run adc", - zap.Error(runErr), - zap.String("output", string(stdout)), - zap.String("stderr", string(stderr)), - ) + e.log.Error(runErr, "failed to run adc", "output", string(stdout), "stderr", string(stderr)) return errors.New("failed to sync resources: " + errMsg + ", exit err: " + runErr.Error()) } func (e *DefaultADCExecutor) handleOutput(output []byte) (*adctypes.SyncResult, error) { + e.log.V(1).Info("adc command output", "output", string(output)) var result adctypes.SyncResult - log.Debugw("adc output", zap.String("output", string(output))) if lines := bytes.Split(output, []byte{'\n'}); len(lines) > 0 { output = lines[len(lines)-1] } if err := json.Unmarshal(output, &result); err != nil { - log.Errorw("failed to unmarshal adc output", - zap.Error(err), - zap.String("stdout", string(output)), - ) - return nil, errors.New("failed to parse adc result: " + err.Error()) + return nil, errors.New("failed to unmarshal response: " + string(output) + ", err: " + err.Error()) } - return &result, nil } @@ -225,8 +215,10 @@ type ADCServerOpts struct { type HTTPADCExecutor struct { httpClient *http.Client serverURL string + log logr.Logger } +<<<<<<< HEAD // NewHTTPADCExecutor creates a new HTTPADCExecutor with the specified ADC Server URL func NewHTTPADCExecutor(serverURL string, timeout time.Duration) *HTTPADCExecutor { return &HTTPADCExecutor{ @@ -234,6 +226,35 @@ func NewHTTPADCExecutor(serverURL string, timeout time.Duration) *HTTPADCExecuto Timeout: timeout, }, serverURL: serverURL, +======= +// NewHTTPADCExecutor creates a new HTTPADCExecutor with the specified ADC Server URL. +// serverURL can be "http(s)://host:port" or "unix:///path/to/socket" or "unix:/path/to/socket". +func NewHTTPADCExecutor(log logr.Logger, serverURL string, timeout time.Duration) *HTTPADCExecutor { + httpClient := &http.Client{ + Timeout: timeout, + } + + if strings.HasPrefix(serverURL, "unix:") { + var socketPath string + if strings.HasPrefix(serverURL, "unix:///") { + socketPath = strings.TrimPrefix(serverURL, "unix://") + } else { + socketPath = strings.TrimPrefix(serverURL, "unix:") + } + transport := &http.Transport{ + DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, "unix", socketPath) + }, + } + httpClient.Transport = transport + serverURL = "http://unix" + } + + return &HTTPADCExecutor{ + httpClient: httpClient, + serverURL: serverURL, + log: log.WithName("executor"), +>>>>>>> d9550d88 (chore: unify the logging component (#2584)) } } @@ -254,11 +275,11 @@ func (e *HTTPADCExecutor) runHTTPSync(ctx context.Context, mode string, config a } return config.ServerAddrs }() - log.Debugw("running http sync", zap.Strings("serverAddrs", serverAddrs), zap.String("mode", mode)) + e.log.V(1).Info("running http sync", "serverAddrs", serverAddrs, "mode", mode) for _, addr := range serverAddrs { if err := e.runHTTPSyncForSingleServer(ctx, addr, mode, config, args); err != nil { - log.Errorw("failed to run http sync for server", zap.String("server", addr), zap.Error(err)) + e.log.Error(err, "failed to run http sync for server", "server", addr) var execErr types.ADCExecutionServerAddrError if errors.As(err, &execErr) { execErrs.FailedErrors = append(execErrs.FailedErrors, execErr) @@ -306,7 +327,7 @@ func (e *HTTPADCExecutor) runHTTPSyncForSingleServer(ctx context.Context, server } defer func() { if closeErr := resp.Body.Close(); closeErr != nil { - log.Warnw("failed to close response body", zap.Error(closeErr)) + e.log.Error(closeErr, "failed to close response body") } }() @@ -385,21 +406,21 @@ func (e *HTTPADCExecutor) buildHTTPRequest(ctx context.Context, serverAddr, mode }, } + e.log.V(1).Info("prepared request body", "body", reqBody) + jsonData, err := json.Marshal(reqBody) if err != nil { return nil, fmt.Errorf("failed to marshal request body: %w", err) } - log.Debugw("request body", zap.String("body", string(jsonData))) - - log.Debugw("sending HTTP request to ADC Server", - zap.String("url", e.serverURL+"/sync"), - zap.String("server", serverAddr), - zap.String("mode", mode), - zap.String("cacheKey", config.Name), - zap.Any("labelSelector", labels), - zap.Strings("includeResourceType", types), - zap.Bool("tlsSkipVerify", !tlsVerify), + e.log.V(1).Info("sending HTTP request to ADC Server", + "url", e.serverURL+"/sync", + "server", serverAddr, + "mode", mode, + "cacheKey", config.Name, + "labelSelector", labels, + "includeResourceType", types, + "tlsSkipVerify", !tlsVerify, ) // Create HTTP request @@ -419,10 +440,10 @@ func (e *HTTPADCExecutor) handleHTTPResponse(resp *http.Response, serverAddr str return fmt.Errorf("failed to read response body: %w", err) } - log.Debugw("received HTTP response from ADC Server", - zap.String("server", serverAddr), - zap.Int("status", resp.StatusCode), - zap.String("response", string(body)), + e.log.V(1).Info("received HTTP response from ADC Server", + "server", serverAddr, + "status", resp.StatusCode, + "response", string(body), ) // not only 200, HTTP 202 is also accepted @@ -436,23 +457,20 @@ func (e *HTTPADCExecutor) handleHTTPResponse(resp *http.Response, serverAddr str // Parse response body var result adctypes.SyncResult if err := json.Unmarshal(body, &result); err != nil { - log.Errorw("failed to unmarshal ADC Server response", - zap.Error(err), - zap.String("response", string(body)), - ) - return fmt.Errorf("failed to parse ADC Server response: %w", err) + return fmt.Errorf("failed to unmarshal response body: %s, err: %w", string(body), err) } // Check for sync failures if result.FailedCount > 0 && len(result.Failed) > 0 { - log.Errorw("ADC Server sync failed", zap.Any("result", result)) + reason := result.Failed[0].Reason + e.log.Error(fmt.Errorf("ADC Server sync failed: %s", reason), "ADC Server sync failed", "result", result) return types.ADCExecutionServerAddrError{ ServerAddr: serverAddr, - Err: result.Failed[0].Reason, + Err: reason, FailedStatuses: result.Failed, } } - log.Debugw("ADC Server sync success", zap.Any("result", result)) + e.log.V(1).Info("ADC Server sync success", "result", result) return nil } diff --git a/internal/adc/translator/apisixroute.go b/internal/adc/translator/apisixroute.go index 505ba35f9..08dda5f8c 100644 --- a/internal/adc/translator/apisixroute.go +++ b/internal/adc/translator/apisixroute.go @@ -23,9 +23,7 @@ import ( "fmt" "strconv" - "github.com/api7/gopkg/pkg/log" "github.com/pkg/errors" - "go.uber.org/zap" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -251,7 +249,7 @@ func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc } au, ok := tctx.Upstreams[upsNN] if !ok { - log.Debugw("failed to retrieve ApisixUpstream from tctx", zap.Any("ApisixUpstream", upsNN)) + t.Log.V(1).Info("failed to retrieve ApisixUpstream from tctx", "ApisixUpstream", upsNN.String()) continue } upstream, err := t.translateApisixUpstream(tctx, au) diff --git a/internal/adc/translator/apisixupstream.go b/internal/adc/translator/apisixupstream.go index b56791dc0..fb8af6912 100644 --- a/internal/adc/translator/apisixupstream.go +++ b/internal/adc/translator/apisixupstream.go @@ -21,9 +21,7 @@ import ( "cmp" "fmt" - "github.com/api7/gopkg/pkg/log" "github.com/pkg/errors" - "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -33,10 +31,56 @@ import ( "github.com/apache/apisix-ingress-controller/internal/utils" ) +<<<<<<< HEAD func (t *Translator) translateApisixUpstream(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream) (ups *adc.Upstream, err error) { ups = adc.NewDefaultUpstream() for _, f := range []func(*apiv2.ApisixUpstream, *adc.Upstream) error{ patchApisixUpstreamBasics, +======= +func (t *Translator) translateApisixUpstream(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream) (*adc.Upstream, error) { + return t.translateApisixUpstreamForPort(tctx, au, nil) +} + +func (t *Translator) translateApisixUpstreamForPort(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream, port *int32) (*adc.Upstream, error) { + t.Log.V(1).Info("translating ApisixUpstream", "apisixupstream", au, "port", port) + + ups := adc.NewDefaultUpstream() + ups.Name = composeExternalUpstreamName(au) + maps.Copy(ups.Labels, au.Labels) + + // translateApisixUpstreamConfig translates the core upstream configuration fields + // from au.Spec.ApisixUpstreamConfig into the ADC upstream. + // + // Note: ExternalNodes is not part of ApisixUpstreamConfig but a separate field + // on ApisixUpstreamSpec, so it is handled separately in translateApisixUpstreamExternalNodes. + if err := translateApisixUpstreamConfig(tctx, &au.Spec.ApisixUpstreamConfig, ups); err != nil { + return nil, err + } + if err := translateApisixUpstreamExternalNodes(tctx, au, ups); err != nil { + return nil, err + } + + // If PortLevelSettings is configured and a specific port is provided, + // apply the ApisixUpstreamConfig for the matching port to the upstream. + if len(au.Spec.PortLevelSettings) > 0 && port != nil { + for _, pls := range au.Spec.PortLevelSettings { + if pls.Port != *port { + continue + } + if err := translateApisixUpstreamConfig(tctx, &pls.ApisixUpstreamConfig, ups); err != nil { + return nil, err + } + } + } + + t.Log.V(1).Info("translated ApisixUpstream", "upstream", ups) + + return ups, nil +} + +func translateApisixUpstreamConfig(tctx *provider.TranslateContext, config *apiv2.ApisixUpstreamConfig, ups *adc.Upstream) (err error) { + for _, f := range []func(*apiv2.ApisixUpstreamConfig, *adc.Upstream) error{ +>>>>>>> d9550d88 (chore: unify the logging component (#2584)) translateApisixUpstreamScheme, translateApisixUpstreamLoadBalancer, translateApisixUpstreamRetriesAndTimeout, diff --git a/internal/adc/translator/gateway.go b/internal/adc/translator/gateway.go index 43fc765f2..db2848452 100644 --- a/internal/adc/translator/gateway.go +++ b/internal/adc/translator/gateway.go @@ -24,9 +24,7 @@ import ( "fmt" "slices" - "github.com/api7/gopkg/pkg/log" "github.com/pkg/errors" - "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" @@ -57,7 +55,7 @@ func (t *Translator) TranslateGateway(tctx *provider.TranslateContext, obj *gate rk := utils.NamespacedNameKind(obj) gatewayProxy, ok := tctx.GatewayProxies[rk] if !ok { - log.Debugw("no GatewayProxy found for Gateway", zap.String("gateway", obj.Name)) + t.Log.V(1).Info("no GatewayProxy found for Gateway", "gateway", obj.Name) return result, nil } @@ -92,17 +90,18 @@ func (t *Translator) translateSecret(tctx *provider.TranslateContext, listener g Snis: []string{}, } name := listener.TLS.CertificateRefs[0].Name - secret := tctx.Secrets[types.NamespacedName{Namespace: ns, Name: string(ref.Name)}] + secretNN := types.NamespacedName{Namespace: ns, Name: string(ref.Name)} + secret := tctx.Secrets[secretNN] if secret == nil { continue } if secret.Data == nil { - log.Errorw("secret data is nil", zap.Any("secret", secret)) + t.Log.Error(errors.New("secret data is nil"), "failed to get secret data", "secret", secretNN) return nil, fmt.Errorf("no secret data found for %s/%s", ns, name) } cert, key, err := extractKeyPair(secret, true) if err != nil { - log.Errorw("failed to extract key pair", zap.Error(err), zap.Any("secret", secret)) + t.Log.Error(err, "extract key pair", "secret", secretNN) return nil, err } sslObj.Certificates = append(sslObj.Certificates, adctypes.Certificate{ @@ -118,14 +117,14 @@ func (t *Translator) translateSecret(tctx *provider.TranslateContext, listener g return nil, err } if len(hosts) == 0 { - log.Warnw("no valid hostname found in certificate", zap.String("secret", secret.Namespace+"/"+secret.Name)) + t.Log.Info("no valid hostname found in certificate", "secret", secretNN.String()) continue } sslObj.Snis = append(sslObj.Snis, hosts...) } // Note: use cert as id to avoid duplicate certificate across ssl objects sslObj.ID = id.GenID(string(cert)) - log.Debugw("generated ssl id", zap.String("ssl id", sslObj.ID), zap.String("secret", secret.Namespace+"/"+secret.Name)) + t.Log.V(1).Info("generated ssl id", "ssl id", sslObj.ID, "secret", secretNN.String()) sslObj.Labels = label.GenLabel(obj) sslObjs = append(sslObjs, sslObj) } @@ -219,13 +218,13 @@ func (t *Translator) fillPluginsFromGatewayProxy(plugins adctypes.GlobalRule, ga pluginConfig := map[string]any{} if len(plugin.Config.Raw) > 0 { if err := json.Unmarshal(plugin.Config.Raw, &pluginConfig); err != nil { - log.Errorw("gateway proxy plugin config unmarshal failed", zap.Error(err), zap.String("plugin", pluginName)) + t.Log.Error(err, "gateway proxy plugin config unmarshal failed", "plugin", pluginName) continue } } plugins[pluginName] = pluginConfig } - log.Debugw("fill plugins for gateway proxy", zap.Any("plugins", plugins)) + t.Log.V(1).Info("fill plugins for gateway proxy", "plugins", plugins) } func (t *Translator) fillPluginMetadataFromGatewayProxy(pluginMetadata adctypes.PluginMetadata, gatewayProxy *v1alpha1.GatewayProxy) { @@ -235,10 +234,10 @@ func (t *Translator) fillPluginMetadataFromGatewayProxy(pluginMetadata adctypes. for pluginName, plugin := range gatewayProxy.Spec.PluginMetadata { var pluginConfig map[string]any if err := json.Unmarshal(plugin.Raw, &pluginConfig); err != nil { - log.Errorw("gateway proxy plugin_metadata unmarshal failed", zap.Error(err), zap.Any("plugin", pluginName), zap.String("config", string(plugin.Raw))) + t.Log.Error(err, "gateway proxy plugin_metadata unmarshal failed", "plugin", pluginName, "config", string(plugin.Raw)) continue } - log.Debugw("fill plugin_metadata for gateway proxy", zap.String("plugin", pluginName), zap.Any("config", pluginConfig)) + t.Log.V(1).Info("fill plugin_metadata for gateway proxy", "plugin", pluginName, "config", pluginConfig) pluginMetadata[pluginName] = pluginConfig } } diff --git a/internal/adc/translator/gatewayproxy.go b/internal/adc/translator/gatewayproxy.go index 8b7fb673e..6636816ff 100644 --- a/internal/adc/translator/gatewayproxy.go +++ b/internal/adc/translator/gatewayproxy.go @@ -22,9 +22,7 @@ import ( "net" "strconv" - "github.com/api7/gopkg/pkg/log" "github.com/pkg/errors" - "go.uber.org/zap" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" k8stypes "k8s.io/apimachinery/pkg/types" @@ -108,7 +106,7 @@ func (t *Translator) TranslateGatewayProxyToConfig(tctx *provider.TranslateConte }, }, func(endpoint *discoveryv1.Endpoint) bool { if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating { - log.Debugw("skip terminating endpoint", zap.Any("endpoint", endpoint)) + t.Log.V(1).Info("skip terminating endpoint", "endpoint", endpoint) return false } return true @@ -130,7 +128,7 @@ func (t *Translator) TranslateGatewayProxyToConfig(tctx *provider.TranslateConte config.ServerAddrs = []string{serverAddr} } - log.Debugw("add server address to config.ServiceAddrs", zap.Strings("config.ServerAddrs", config.ServerAddrs)) + t.Log.V(1).Info("add server address to config.ServiceAddrs", "config.ServerAddrs", config.ServerAddrs) } return &config, nil diff --git a/internal/adc/translator/globalrule.go b/internal/adc/translator/globalrule.go index 692321db4..89f1626ac 100644 --- a/internal/adc/translator/globalrule.go +++ b/internal/adc/translator/globalrule.go @@ -20,9 +20,6 @@ package translator import ( "encoding/json" - "github.com/api7/gopkg/pkg/log" - "go.uber.org/zap" - adctypes "github.com/apache/apisix-ingress-controller/api/adc" apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/provider" @@ -30,10 +27,7 @@ import ( // TranslateApisixGlobalRule translates ApisixGlobalRule to APISIX GlobalRule func (t *Translator) TranslateApisixGlobalRule(tctx *provider.TranslateContext, obj *apiv2.ApisixGlobalRule) (*TranslateResult, error) { - log.Debugw("translating ApisixGlobalRule", - zap.String("namespace", obj.Namespace), - zap.String("name", obj.Name), - ) + t.Log.V(1).Info("translating ApisixGlobalRule", "namespace", obj.Namespace, "name", obj.Name) // Create global rule plugins plugins := make(adctypes.Plugins) @@ -48,7 +42,7 @@ func (t *Translator) TranslateApisixGlobalRule(tctx *provider.TranslateContext, pluginConfig := make(map[string]any) if len(plugin.Config.Raw) > 0 { if err := json.Unmarshal(plugin.Config.Raw, &pluginConfig); err != nil { - log.Errorw("failed to unmarshal plugin config", zap.String("plugin", plugin.Name), zap.Error(err)) + t.Log.Error(err, "failed to unmarshal plugin config", "plugin", plugin.Name) continue } } diff --git a/internal/adc/translator/httproute.go b/internal/adc/translator/httproute.go index ddb5d329e..3fe621d1e 100644 --- a/internal/adc/translator/httproute.go +++ b/internal/adc/translator/httproute.go @@ -23,9 +23,7 @@ import ( "fmt" "strings" - "github.com/api7/gopkg/pkg/log" "github.com/pkg/errors" - "go.uber.org/zap" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" @@ -85,13 +83,13 @@ func (t *Translator) fillPluginFromExtensionRef(plugins adctypes.Plugins, namesp pluginconfig := make(map[string]any) if len(plugin.Config.Raw) > 0 { if err := json.Unmarshal(plugin.Config.Raw, &pluginconfig); err != nil { - log.Errorw("plugin config unmarshal failed", zap.Error(err)) + t.Log.Error(err, "plugin config unmarshal failed", "plugin", plugin.Name) continue } } plugins[pluginName] = pluginconfig } - log.Debugw("fill plugin from extension ref", zap.Any("plugins", plugins)) + t.Log.V(1).Info("fill plugin from extension ref", "plugins", plugins) } } @@ -323,7 +321,7 @@ func (t *Translator) fillHTTPRoutePolicies(routes []*adctypes.Route, policies [] for _, data := range policy.Spec.Vars { var v []adctypes.StringOrSlice if err := json.Unmarshal(data.Raw, &v); err != nil { - log.Errorw("failed to unmarshal spec.Vars item to []StringOrSlice", zap.Error(err), zap.String("data", string(data.Raw))) + t.Log.Error(err, "failed to unmarshal spec.Vars item to []StringOrSlice", "data", string(data.Raw)) // todo: update status continue } @@ -345,6 +343,7 @@ func (t *Translator) translateEndpointSlice(portName *string, weight int, endpoi } for _, endpoint := range endpointSlice.Endpoints { if endpointFilter != nil && !endpointFilter(&endpoint) { + t.Log.V(1).Info("skip endpoint by filter", "endpoint", endpoint) continue } for _, addr := range endpoint.Addresses { @@ -367,7 +366,6 @@ func (t *Translator) translateEndpointSlice(portName *string, weight int, endpoi func DefaultEndpointFilter(endpoint *discoveryv1.Endpoint) bool { if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { - log.Debugw("skip not ready endpoint", zap.Any("endpoint", endpoint)) return false } return true diff --git a/internal/adc/translator/ingressclass.go b/internal/adc/translator/ingressclass.go index d91425925..064bd77d9 100644 --- a/internal/adc/translator/ingressclass.go +++ b/internal/adc/translator/ingressclass.go @@ -18,8 +18,6 @@ package translator import ( - "github.com/api7/gopkg/pkg/log" - "go.uber.org/zap" networkingv1 "k8s.io/api/networking/v1" adctypes "github.com/apache/apisix-ingress-controller/api/adc" @@ -33,7 +31,7 @@ func (t *Translator) TranslateIngressClass(tctx *provider.TranslateContext, obj rk := utils.NamespacedNameKind(obj) gatewayProxy, ok := tctx.GatewayProxies[rk] if !ok { - log.Debugw("no GatewayProxy found for IngressClass", zap.String("ingressclass", obj.Name)) + t.Log.V(1).Info("no GatewayProxy found for IngressClass", "ingressclass", obj.Name) return result, nil } diff --git a/internal/adc/translator/translator.go b/internal/adc/translator/translator.go index 4c9bf0d84..aeaef2509 100644 --- a/internal/adc/translator/translator.go +++ b/internal/adc/translator/translator.go @@ -29,7 +29,7 @@ type Translator struct { func NewTranslator(log logr.Logger) *Translator { return &Translator{ - Log: log, + Log: log.WithName("translator"), } } diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index cb3a88f2b..97b126ccc 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -24,9 +24,7 @@ import ( "fmt" "slices" - "github.com/api7/gopkg/pkg/log" "github.com/go-logr/logr" - "go.uber.org/zap" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" @@ -441,9 +439,9 @@ func (r *ApisixRouteReconciler) validateHTTPBackend(tctx *provider.TranslateCont } // try to get apisixupstream with the same name as the backend service - log.Debugw("try to get apisixupstream with the same name as the backend service", zap.Stringer("Service", serviceNN)) + r.Log.V(1).Info("try to get apisixupstream with the same name as the backend service", "Service", serviceNN) if err := r.Get(tctx, serviceNN, &au); err != nil { - log.Debugw("no ApisixUpstream with the same name as the backend service found", zap.Stringer("Service", serviceNN), zap.Error(err)) + r.Log.V(1).Info("no ApisixUpstream with the same name as the backend service found", "Service", serviceNN, "Error", err) if err = client.IgnoreNotFound(err); err != nil { return err } diff --git a/internal/controller/gatewayproxy_controller.go b/internal/controller/gatewayproxy_controller.go index 2687b5632..fdd0346d4 100644 --- a/internal/controller/gatewayproxy_controller.go +++ b/internal/controller/gatewayproxy_controller.go @@ -136,7 +136,11 @@ func (r *GatewayProxyController) Reconcile(ctx context.Context, req ctrl.Request if providerService == nil { tctx.EndpointSlices[req.NamespacedName] = nil } else { +<<<<<<< HEAD serviceNN := k8stypes.NamespacedName{ +======= + if err := addProviderEndpointsToTranslateContext(tctx, r.Client, r.Log, types.NamespacedName{ +>>>>>>> d9550d88 (chore: unify the logging component (#2584)) Namespace: gp.Namespace, Name: providerService.Name, } diff --git a/internal/controller/grpcroute_controller.go b/internal/controller/grpcroute_controller.go index 138011a9e..80d30c02a 100644 --- a/internal/controller/grpcroute_controller.go +++ b/internal/controller/grpcroute_controller.go @@ -182,7 +182,7 @@ func (r *GRPCRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( msg: "Route is accepted", } - gateways, err := ParseRouteParentRefs(ctx, r.Client, gr, gr.Spec.ParentRefs) + gateways, err := ParseRouteParentRefs(ctx, r.Client, r.Log, gr, gr.Spec.ParentRefs) if err != nil { return ctrl.Result{}, err } diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index 84b588d2f..0927fb5f4 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -22,10 +22,8 @@ import ( "context" "fmt" - "github.com/api7/gopkg/pkg/log" "github.com/go-logr/logr" "github.com/pkg/errors" - "go.uber.org/zap" "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" @@ -187,7 +185,7 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( msg: "Route is accepted", } - gateways, err := ParseRouteParentRefs(ctx, r.Client, hr, hr.Spec.ParentRefs) + gateways, err := ParseRouteParentRefs(ctx, r.Client, r.Log, hr, hr.Spec.ParentRefs) if err != nil { return ctrl.Result{}, err } @@ -270,7 +268,7 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if isRouteAccepted(gateways) && err == nil { routeToUpdate := hr if filteredHTTPRoute != nil { - log.Debugw("filteredHTTPRoute", zap.Any("filteredHTTPRoute", filteredHTTPRoute)) + r.Log.V(1).Info("filtered httproute", "httproute", filteredHTTPRoute) routeToUpdate = filteredHTTPRoute } if err := r.Provider.Update(ctx, tctx, routeToUpdate); err != nil { diff --git a/internal/controller/ingress_controller.go b/internal/controller/ingress_controller.go index ae20e5397..3d0a89e2a 100644 --- a/internal/controller/ingress_controller.go +++ b/internal/controller/ingress_controller.go @@ -22,9 +22,7 @@ import ( "fmt" "reflect" - "github.com/api7/gopkg/pkg/log" "github.com/go-logr/logr" - "go.uber.org/zap" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" @@ -516,7 +514,7 @@ func (r *IngressReconciler) processTLS(tctx *provider.TranslateContext, ingress } if secret.Data == nil { - log.Warnw("secret data is nil", zap.String("secret", secret.Namespace+"/"+secret.Name)) + r.Log.Error(fmt.Errorf("no secret data found"), "secret data is nil", "namespace", ingress.Namespace, "name", tls.SecretName) continue } @@ -611,7 +609,7 @@ func (r *IngressReconciler) updateStatus(ctx context.Context, tctx *provider.Tra gatewayProxy, ok := tctx.GatewayProxies[ingressClassKind] if !ok { - log.Debugw("no gateway proxy found for ingress class", zap.String("ingressClass", ingressClass.Name)) + r.Log.V(1).Info("no gateway proxy found for ingress class", "ingressClass", ingressClass.Name) return nil } diff --git a/internal/controller/ingressclass_controller.go b/internal/controller/ingressclass_controller.go index b0cea10e9..47f8980ce 100644 --- a/internal/controller/ingressclass_controller.go +++ b/internal/controller/ingressclass_controller.go @@ -173,3 +173,74 @@ func (r *IngressClassReconciler) listIngressClassesForSecret(ctx context.Context return distinctRequests(requests) } +<<<<<<< HEAD +======= + +func (r *IngressClassReconciler) processInfrastructure(tctx *provider.TranslateContext, ingressClass *networkingv1.IngressClass) error { + if ingressClass.Spec.Parameters == nil { + return nil + } + + if ingressClass.Spec.Parameters.APIGroup == nil || + *ingressClass.Spec.Parameters.APIGroup != v1alpha1.GroupVersion.Group || + ingressClass.Spec.Parameters.Kind != KindGatewayProxy { + return nil + } + + namespace := ingressClass.Namespace + if ingressClass.Spec.Parameters.Namespace != nil { + namespace = *ingressClass.Spec.Parameters.Namespace + } + + gatewayProxy := new(v1alpha1.GatewayProxy) + if err := r.Get(context.Background(), client.ObjectKey{ + Namespace: namespace, + Name: ingressClass.Spec.Parameters.Name, + }, gatewayProxy); err != nil { + return fmt.Errorf("failed to get gateway proxy: %w", err) + } + + rk := utils.NamespacedNameKind(ingressClass) + + tctx.GatewayProxies[rk] = *gatewayProxy + tctx.ResourceParentRefs[rk] = append(tctx.ResourceParentRefs[rk], rk) + + // Load secrets if needed + if gatewayProxy.Spec.Provider != nil && gatewayProxy.Spec.Provider.ControlPlane != nil { + auth := gatewayProxy.Spec.Provider.ControlPlane.Auth + if auth.Type == v1alpha1.AuthTypeAdminKey && auth.AdminKey != nil && auth.AdminKey.ValueFrom != nil { + if auth.AdminKey.ValueFrom.SecretKeyRef != nil { + secretRef := auth.AdminKey.ValueFrom.SecretKeyRef + secret := &corev1.Secret{} + if err := r.Get(context.Background(), client.ObjectKey{ + Namespace: namespace, + Name: secretRef.Name, + }, secret); err != nil { + r.Log.Error(err, "failed to get secret for gateway proxy", "namespace", namespace, "name", secretRef.Name) + return err + } + tctx.Secrets[client.ObjectKey{ + Namespace: namespace, + Name: secretRef.Name, + }] = secret + } + } + } + + if service := gatewayProxy.Spec.Provider.ControlPlane.Service; service != nil { + if err := addProviderEndpointsToTranslateContext(tctx, r.Client, r.Log, types.NamespacedName{ + Namespace: gatewayProxy.GetNamespace(), + Name: service.Name, + }); err != nil { + return err + } + } + + _, ok := tctx.GatewayProxies[rk] + if !ok { + return fmt.Errorf("no gateway proxy found for ingress class") + } + + return nil +} +>>>>>>> d9550d88 (chore: unify the logging component (#2584)) diff --git a/internal/controller/tcproute_controller.go b/internal/controller/tcproute_controller.go new file mode 100644 index 000000000..125a14a90 --- /dev/null +++ b/internal/controller/tcproute_controller.go @@ -0,0 +1,505 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package controller + +import ( + "cmp" + "context" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8stypes "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +// TCPRouteReconciler reconciles a TCPRoute object. +type TCPRouteReconciler struct { //nolint:revive + client.Client + Scheme *runtime.Scheme + + Log logr.Logger + + Provider provider.Provider + + Updater status.Updater + Readier readiness.ReadinessManager +} + +// SetupWithManager sets up the controller with the Manager. +func (r *TCPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + + bdr := ctrl.NewControllerManagedBy(mgr). + For(&gatewayv1alpha2.TCPRoute{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + Watches(&discoveryv1.EndpointSlice{}, + handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesByServiceRef), + ). + Watches(&gatewayv1.Gateway{}, + handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForGateway), + builder.WithPredicates( + predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return true + }, + }, + ), + ). + Watches(&v1alpha1.BackendTrafficPolicy{}, + handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForBackendTrafficPolicy), + ). + Watches(&v1alpha1.GatewayProxy{}, + handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForGatewayProxy), + ) + + if GetEnableReferenceGrant() { + bdr.Watches(&v1beta1.ReferenceGrant{}, + handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForReferenceGrant), + builder.WithPredicates(referenceGrantPredicates(KindTCPRoute)), + ) + } + + return bdr.Complete(r) +} + +func (r *TCPRouteReconciler) listTCPRoutesForBackendTrafficPolicy(ctx context.Context, obj client.Object) []reconcile.Request { + policy, ok := obj.(*v1alpha1.BackendTrafficPolicy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } + + tcprouteList := []gatewayv1alpha2.TCPRoute{} + for _, targetRef := range policy.Spec.TargetRefs { + service := &corev1.Service{} + if err := r.Get(ctx, client.ObjectKey{ + Namespace: policy.Namespace, + Name: string(targetRef.Name), + }, service); err != nil { + if client.IgnoreNotFound(err) != nil { + r.Log.Error(err, "failed to get service", "namespace", policy.Namespace, "name", targetRef.Name) + } + continue + } + tcprList := &gatewayv1alpha2.TCPRouteList{} + if err := r.List(ctx, tcprList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)), + }); err != nil { + r.Log.Error(err, "failed to list tcproutes by service reference", "service", targetRef.Name) + return nil + } + tcprouteList = append(tcprouteList, tcprList.Items...) + } + var namespacedNameMap = make(map[k8stypes.NamespacedName]struct{}) + requests := make([]reconcile.Request, 0, len(tcprouteList)) + for _, tr := range tcprouteList { + key := k8stypes.NamespacedName{ + Namespace: tr.Namespace, + Name: tr.Name, + } + if _, ok := namespacedNameMap[key]; !ok { + namespacedNameMap[key] = struct{}{} + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tr.Namespace, + Name: tr.Name, + }, + }) + } + } + return requests +} + +func (r *TCPRouteReconciler) listTCPRoutesForGateway(ctx context.Context, obj client.Object) []reconcile.Request { + gateway, ok := obj.(*gatewayv1.Gateway) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to Gateway") + } + tcprList := &gatewayv1alpha2.TCPRouteList{} + if err := r.List(ctx, tcprList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list tcproutes by gateway", "gateway", gateway.Name) + return nil + } + + requests := make([]reconcile.Request, 0, len(tcprList.Items)) + for _, tcr := range tcprList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tcr.Namespace, + Name: tcr.Name, + }, + }) + } + return requests +} + +// listTCPRoutesForGatewayProxy list all TCPRoute resources that are affected by a given GatewayProxy +func (r *TCPRouteReconciler) listTCPRoutesForGatewayProxy(ctx context.Context, obj client.Object) []reconcile.Request { + gatewayProxy, ok := obj.(*v1alpha1.GatewayProxy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to GatewayProxy") + return nil + } + + namespace := gatewayProxy.GetNamespace() + name := gatewayProxy.GetName() + + // find all gateways that reference this gateway proxy + gatewayList := &gatewayv1.GatewayList{} + if err := r.List(ctx, gatewayList, client.MatchingFields{ + indexer.ParametersRef: indexer.GenIndexKey(namespace, name), + }); err != nil { + r.Log.Error(err, "failed to list gateways for gateway proxy", "gatewayproxy", gatewayProxy.GetName()) + return nil + } + + var requests []reconcile.Request + + // for each gateway, find all TCPRoute resources that reference it + for _, gateway := range gatewayList.Items { + tcpRouteList := &gatewayv1alpha2.TCPRouteList{} + if err := r.List(ctx, tcpRouteList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list tcproutes for gateway", "gateway", gateway.Name) + continue + } + + for _, tcpRoute := range tcpRouteList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tcpRoute.Namespace, + Name: tcpRoute.Name, + }, + }) + } + } + + return requests +} + +func (r *TCPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&gatewayv1alpha2.TCPRoute{}, req.NamespacedName) + tr := new(gatewayv1alpha2.TCPRoute) + if err := r.Get(ctx, req.NamespacedName, tr); err != nil { + if client.IgnoreNotFound(err) == nil { + tr.Namespace = req.Namespace + tr.Name = req.Name + + tr.TypeMeta = metav1.TypeMeta{ + Kind: KindTCPRoute, + APIVersion: gatewayv1alpha2.GroupVersion.String(), + } + + if err := r.Provider.Delete(ctx, tr); err != nil { + r.Log.Error(err, "failed to delete tcproute", "tcproute", tr) + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + type ResourceStatus struct { + status bool + msg string + } + + acceptStatus := ResourceStatus{ + status: true, + msg: "Route is accepted", + } + + gateways, err := ParseRouteParentRefs(ctx, r.Client, r.Log, tr, tr.Spec.ParentRefs) + if err != nil { + return ctrl.Result{}, err + } + + if len(gateways) == 0 { + return ctrl.Result{}, nil + } + + tctx := provider.NewDefaultTranslateContext(ctx) + + tctx.RouteParentRefs = tr.Spec.ParentRefs + rk := utils.NamespacedNameKind(tr) + for _, gateway := range gateways { + if err := ProcessGatewayProxy(r.Client, r.Log, tctx, gateway.Gateway, rk); err != nil { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + var backendRefErr error + if err := r.processTCPRoute(tctx, tr); err != nil { + // When encountering a backend reference error, it should not affect the acceptance status + if types.IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) { + backendRefErr = err + } else { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + // Store the backend reference error for later use. + // If the backend reference error is because of an invalid kind, use this error first + if err := r.processTCPRouteBackendRefs(tctx, req.NamespacedName); err != nil && backendRefErr == nil { + backendRefErr = err + } + + ProcessBackendTrafficPolicy(r.Client, r.Log, tctx) + tr.Status.Parents = make([]gatewayv1.RouteParentStatus, 0, len(gateways)) + for _, gateway := range gateways { + parentStatus := gatewayv1.RouteParentStatus{} + SetRouteParentRef(&parentStatus, gateway.Gateway.Name, gateway.Gateway.Namespace) + for _, condition := range gateway.Conditions { + parentStatus.Conditions = MergeCondition(parentStatus.Conditions, condition) + } + SetRouteConditionAccepted(&parentStatus, tr.GetGeneration(), acceptStatus.status, acceptStatus.msg) + SetRouteConditionResolvedRefs(&parentStatus, tr.GetGeneration(), backendRefErr) + + tr.Status.Parents = append(tr.Status.Parents, parentStatus) + } + + r.Updater.Update(status.Update{ + NamespacedName: utils.NamespacedName(tr), + Resource: &gatewayv1alpha2.TCPRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + t, ok := obj.(*gatewayv1alpha2.TCPRoute) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + tCopy := t.DeepCopy() + tCopy.Status = tr.Status + return tCopy + }), + }) + UpdateStatus(r.Updater, r.Log, tctx) + if isRouteAccepted(gateways) { + routeToUpdate := tr + if err := r.Provider.Update(ctx, tctx, routeToUpdate); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil +} + +func (r *TCPRouteReconciler) processTCPRoute(tctx *provider.TranslateContext, tcpRoute *gatewayv1alpha2.TCPRoute) error { + var terror error + for _, rule := range tcpRoute.Spec.Rules { + for _, backend := range rule.BackendRefs { + if backend.Kind != nil && *backend.Kind != KindService { + terror = types.NewInvalidKindError(*backend.Kind) + continue + } + tctx.BackendRefs = append(tctx.BackendRefs, gatewayv1.BackendRef{ + BackendObjectReference: gatewayv1.BackendObjectReference{ + Name: backend.Name, + Namespace: cmp.Or(backend.Namespace, (*gatewayv1.Namespace)(&tcpRoute.Namespace)), + Port: backend.Port, + }, + }) + } + } + + return terror +} + +func (r *TCPRouteReconciler) processTCPRouteBackendRefs(tctx *provider.TranslateContext, trNN k8stypes.NamespacedName) error { + var terr error + for _, backend := range tctx.BackendRefs { + targetNN := k8stypes.NamespacedName{ + Namespace: trNN.Namespace, + Name: string(backend.Name), + } + if backend.Namespace != nil { + targetNN.Namespace = string(*backend.Namespace) + } + + if backend.Kind != nil && *backend.Kind != KindService { + terr = types.NewInvalidKindError(*backend.Kind) + continue + } + + if backend.Port == nil { + terr = fmt.Errorf("port is required") + continue + } + + var service corev1.Service + if err := r.Get(tctx, targetNN, &service); err != nil { + terr = err + if client.IgnoreNotFound(err) == nil { + terr = types.ReasonError{ + Reason: string(gatewayv1.RouteReasonBackendNotFound), + Message: fmt.Sprintf("Service %s not found", targetNN), + } + } + continue + } + + // if cross namespaces between TCPRoute and referenced Service, check ReferenceGrant + if trNN.Namespace != targetNN.Namespace { + if permitted := checkReferenceGrant(tctx, + r.Client, + v1beta1.ReferenceGrantFrom{ + Group: gatewayv1.GroupName, + Kind: KindTCPRoute, + Namespace: v1beta1.Namespace(trNN.Namespace), + }, + gatewayv1.ObjectReference{ + Group: corev1.GroupName, + Kind: KindService, + Name: gatewayv1.ObjectName(targetNN.Name), + Namespace: (*gatewayv1.Namespace)(&targetNN.Namespace), + }, + ); !permitted { + terr = types.ReasonError{ + Reason: string(v1beta1.RouteReasonRefNotPermitted), + Message: fmt.Sprintf("%s is in a different namespace than the TCPRoute %s and no ReferenceGrant allowing reference is configured", targetNN, trNN), + } + continue + } + } + + if service.Spec.Type == corev1.ServiceTypeExternalName { + tctx.Services[targetNN] = &service + continue + } + + portExists := false + for _, port := range service.Spec.Ports { + if port.Port == int32(*backend.Port) { + portExists = true + break + } + } + if !portExists { + terr = fmt.Errorf("port %d not found in service %s", *backend.Port, targetNN.Name) + continue + } + tctx.Services[targetNN] = &service + + endpointSliceList := new(discoveryv1.EndpointSliceList) + if err := r.List(tctx, endpointSliceList, + client.InNamespace(targetNN.Namespace), + client.MatchingLabels{ + discoveryv1.LabelServiceName: targetNN.Name, + }, + ); err != nil { + r.Log.Error(err, "failed to list endpoint slices", "Service", targetNN) + terr = err + continue + } + + tctx.EndpointSlices[targetNN] = endpointSliceList.Items + } + return terr +} + +func (r *TCPRouteReconciler) listTCPRoutesForReferenceGrant(ctx context.Context, obj client.Object) (requests []reconcile.Request) { + grant, ok := obj.(*v1beta1.ReferenceGrant) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to ReferenceGrant") + return nil + } + + var tcpRouteList gatewayv1alpha2.TCPRouteList + if err := r.List(ctx, &tcpRouteList); err != nil { + r.Log.Error(err, "failed to list tcproutes for reference ReferenceGrant", "ReferenceGrant", k8stypes.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()}) + return nil + } + + for _, tcpRoute := range tcpRouteList.Items { + tr := v1beta1.ReferenceGrantFrom{ + Group: gatewayv1.GroupName, + Kind: KindTCPRoute, + Namespace: v1beta1.Namespace(tcpRoute.GetNamespace()), + } + for _, from := range grant.Spec.From { + if from == tr { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tcpRoute.GetNamespace(), + Name: tcpRoute.GetName(), + }, + }) + } + } + } + return requests +} + +func (r *TCPRouteReconciler) listTCPRoutesByServiceRef(ctx context.Context, obj client.Object) []reconcile.Request { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := endpointSlice.GetNamespace() + serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName] + + trList := &gatewayv1alpha2.TCPRouteList{} + if err := r.List(ctx, trList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName), + }); err != nil { + r.Log.Error(err, "failed to list tcproutes by service", "service", serviceName) + return nil + } + requests := make([]reconcile.Request, 0, len(trList.Items)) + for _, tr := range trList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tr.Namespace, + Name: tr.Name, + }, + }) + } + return requests +} diff --git a/internal/controller/udproute_controller.go b/internal/controller/udproute_controller.go new file mode 100644 index 000000000..2a4a7a4a2 --- /dev/null +++ b/internal/controller/udproute_controller.go @@ -0,0 +1,505 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package controller + +import ( + "cmp" + "context" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8stypes "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +// UDPRouteReconciler reconciles a UDPRoute object. +type UDPRouteReconciler struct { //nolint:revive + client.Client + Scheme *runtime.Scheme + + Log logr.Logger + + Provider provider.Provider + + Updater status.Updater + Readier readiness.ReadinessManager +} + +// SetupWithManager sets up the controller with the Manager. +func (r *UDPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + + bdr := ctrl.NewControllerManagedBy(mgr). + For(&gatewayv1alpha2.UDPRoute{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + Watches(&discoveryv1.EndpointSlice{}, + handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesByServiceRef), + ). + Watches(&gatewayv1.Gateway{}, + handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForGateway), + builder.WithPredicates( + predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return true + }, + }, + ), + ). + Watches(&v1alpha1.BackendTrafficPolicy{}, + handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForBackendTrafficPolicy), + ). + Watches(&v1alpha1.GatewayProxy{}, + handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForGatewayProxy), + ) + + if GetEnableReferenceGrant() { + bdr.Watches(&v1beta1.ReferenceGrant{}, + handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForReferenceGrant), + builder.WithPredicates(referenceGrantPredicates(KindUDPRoute)), + ) + } + + return bdr.Complete(r) +} + +func (r *UDPRouteReconciler) listUDPRoutesForBackendTrafficPolicy(ctx context.Context, obj client.Object) []reconcile.Request { + policy, ok := obj.(*v1alpha1.BackendTrafficPolicy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } + + udprouteList := []gatewayv1alpha2.UDPRoute{} + for _, targetRef := range policy.Spec.TargetRefs { + service := &corev1.Service{} + if err := r.Get(ctx, client.ObjectKey{ + Namespace: policy.Namespace, + Name: string(targetRef.Name), + }, service); err != nil { + if client.IgnoreNotFound(err) != nil { + r.Log.Error(err, "failed to get service", "namespace", policy.Namespace, "name", targetRef.Name) + } + continue + } + udprList := &gatewayv1alpha2.UDPRouteList{} + if err := r.List(ctx, udprList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)), + }); err != nil { + r.Log.Error(err, "failed to list udproutes by service reference", "service", targetRef.Name) + return nil + } + udprouteList = append(udprouteList, udprList.Items...) + } + var namespacedNameMap = make(map[k8stypes.NamespacedName]struct{}) + requests := make([]reconcile.Request, 0, len(udprouteList)) + for _, tr := range udprouteList { + key := k8stypes.NamespacedName{ + Namespace: tr.Namespace, + Name: tr.Name, + } + if _, ok := namespacedNameMap[key]; !ok { + namespacedNameMap[key] = struct{}{} + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tr.Namespace, + Name: tr.Name, + }, + }) + } + } + return requests +} + +func (r *UDPRouteReconciler) listUDPRoutesForGateway(ctx context.Context, obj client.Object) []reconcile.Request { + gateway, ok := obj.(*gatewayv1.Gateway) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to Gateway") + } + udprList := &gatewayv1alpha2.UDPRouteList{} + if err := r.List(ctx, udprList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list udproutes by gateway", "gateway", gateway.Name) + return nil + } + + requests := make([]reconcile.Request, 0, len(udprList.Items)) + for _, tcr := range udprList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tcr.Namespace, + Name: tcr.Name, + }, + }) + } + return requests +} + +// listUDPRoutesForGatewayProxy list all UDPRoute resources that are affected by a given GatewayProxy +func (r *UDPRouteReconciler) listUDPRoutesForGatewayProxy(ctx context.Context, obj client.Object) []reconcile.Request { + gatewayProxy, ok := obj.(*v1alpha1.GatewayProxy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to GatewayProxy") + return nil + } + + namespace := gatewayProxy.GetNamespace() + name := gatewayProxy.GetName() + + // find all gateways that reference this gateway proxy + gatewayList := &gatewayv1.GatewayList{} + if err := r.List(ctx, gatewayList, client.MatchingFields{ + indexer.ParametersRef: indexer.GenIndexKey(namespace, name), + }); err != nil { + r.Log.Error(err, "failed to list gateways for gateway proxy", "gatewayproxy", gatewayProxy.GetName()) + return nil + } + + var requests []reconcile.Request + + // for each gateway, find all UDPRoute resources that reference it + for _, gateway := range gatewayList.Items { + udpRouteList := &gatewayv1alpha2.UDPRouteList{} + if err := r.List(ctx, udpRouteList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list udproutes for gateway", "gateway", gateway.Name) + continue + } + + for _, udpRoute := range udpRouteList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: udpRoute.Namespace, + Name: udpRoute.Name, + }, + }) + } + } + + return requests +} + +func (r *UDPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&gatewayv1alpha2.UDPRoute{}, req.NamespacedName) + tr := new(gatewayv1alpha2.UDPRoute) + if err := r.Get(ctx, req.NamespacedName, tr); err != nil { + if client.IgnoreNotFound(err) == nil { + tr.Namespace = req.Namespace + tr.Name = req.Name + + tr.TypeMeta = metav1.TypeMeta{ + Kind: KindUDPRoute, + APIVersion: gatewayv1alpha2.GroupVersion.String(), + } + + if err := r.Provider.Delete(ctx, tr); err != nil { + r.Log.Error(err, "failed to delete udproute", "udproute", tr) + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + type ResourceStatus struct { + status bool + msg string + } + + acceptStatus := ResourceStatus{ + status: true, + msg: "Route is accepted", + } + + gateways, err := ParseRouteParentRefs(ctx, r.Client, r.Log, tr, tr.Spec.ParentRefs) + if err != nil { + return ctrl.Result{}, err + } + + if len(gateways) == 0 { + return ctrl.Result{}, nil + } + + tctx := provider.NewDefaultTranslateContext(ctx) + + tctx.RouteParentRefs = tr.Spec.ParentRefs + rk := utils.NamespacedNameKind(tr) + for _, gateway := range gateways { + if err := ProcessGatewayProxy(r.Client, r.Log, tctx, gateway.Gateway, rk); err != nil { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + var backendRefErr error + if err := r.processUDPRoute(tctx, tr); err != nil { + // When encountering a backend reference error, it should not affect the acceptance status + if types.IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) { + backendRefErr = err + } else { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + // Store the backend reference error for later use. + // If the backend reference error is because of an invalid kind, use this error first + if err := r.processUDPRouteBackendRefs(tctx, req.NamespacedName); err != nil && backendRefErr == nil { + backendRefErr = err + } + + ProcessBackendTrafficPolicy(r.Client, r.Log, tctx) + tr.Status.Parents = make([]gatewayv1.RouteParentStatus, 0, len(gateways)) + for _, gateway := range gateways { + parentStatus := gatewayv1.RouteParentStatus{} + SetRouteParentRef(&parentStatus, gateway.Gateway.Name, gateway.Gateway.Namespace) + for _, condition := range gateway.Conditions { + parentStatus.Conditions = MergeCondition(parentStatus.Conditions, condition) + } + SetRouteConditionAccepted(&parentStatus, tr.GetGeneration(), acceptStatus.status, acceptStatus.msg) + SetRouteConditionResolvedRefs(&parentStatus, tr.GetGeneration(), backendRefErr) + + tr.Status.Parents = append(tr.Status.Parents, parentStatus) + } + + r.Updater.Update(status.Update{ + NamespacedName: utils.NamespacedName(tr), + Resource: &gatewayv1alpha2.UDPRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + t, ok := obj.(*gatewayv1alpha2.UDPRoute) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + tCopy := t.DeepCopy() + tCopy.Status = tr.Status + return tCopy + }), + }) + UpdateStatus(r.Updater, r.Log, tctx) + if isRouteAccepted(gateways) { + routeToUpdate := tr + if err := r.Provider.Update(ctx, tctx, routeToUpdate); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil +} + +func (r *UDPRouteReconciler) processUDPRoute(tctx *provider.TranslateContext, udpRoute *gatewayv1alpha2.UDPRoute) error { + var terror error + for _, rule := range udpRoute.Spec.Rules { + for _, backend := range rule.BackendRefs { + if backend.Kind != nil && *backend.Kind != KindService { + terror = types.NewInvalidKindError(*backend.Kind) + continue + } + tctx.BackendRefs = append(tctx.BackendRefs, gatewayv1.BackendRef{ + BackendObjectReference: gatewayv1.BackendObjectReference{ + Name: backend.Name, + Namespace: cmp.Or(backend.Namespace, (*gatewayv1.Namespace)(&udpRoute.Namespace)), + Port: backend.Port, + }, + }) + } + } + + return terror +} + +func (r *UDPRouteReconciler) processUDPRouteBackendRefs(tctx *provider.TranslateContext, trNN k8stypes.NamespacedName) error { + var terr error + for _, backend := range tctx.BackendRefs { + targetNN := k8stypes.NamespacedName{ + Namespace: trNN.Namespace, + Name: string(backend.Name), + } + if backend.Namespace != nil { + targetNN.Namespace = string(*backend.Namespace) + } + + if backend.Kind != nil && *backend.Kind != KindService { + terr = types.NewInvalidKindError(*backend.Kind) + continue + } + + if backend.Port == nil { + terr = fmt.Errorf("port is required") + continue + } + + var service corev1.Service + if err := r.Get(tctx, targetNN, &service); err != nil { + terr = err + if client.IgnoreNotFound(err) == nil { + terr = types.ReasonError{ + Reason: string(gatewayv1.RouteReasonBackendNotFound), + Message: fmt.Sprintf("Service %s not found", targetNN), + } + } + continue + } + + // if cross namespaces between UDPRoute and referenced Service, check ReferenceGrant + if trNN.Namespace != targetNN.Namespace { + if permitted := checkReferenceGrant(tctx, + r.Client, + v1beta1.ReferenceGrantFrom{ + Group: gatewayv1.GroupName, + Kind: KindUDPRoute, + Namespace: v1beta1.Namespace(trNN.Namespace), + }, + gatewayv1.ObjectReference{ + Group: corev1.GroupName, + Kind: KindService, + Name: gatewayv1.ObjectName(targetNN.Name), + Namespace: (*gatewayv1.Namespace)(&targetNN.Namespace), + }, + ); !permitted { + terr = types.ReasonError{ + Reason: string(v1beta1.RouteReasonRefNotPermitted), + Message: fmt.Sprintf("%s is in a different namespace than the UDPRoute %s and no ReferenceGrant allowing reference is configured", targetNN, trNN), + } + continue + } + } + + if service.Spec.Type == corev1.ServiceTypeExternalName { + tctx.Services[targetNN] = &service + continue + } + + portExists := false + for _, port := range service.Spec.Ports { + if port.Port == int32(*backend.Port) { + portExists = true + break + } + } + if !portExists { + terr = fmt.Errorf("port %d not found in service %s", *backend.Port, targetNN.Name) + continue + } + tctx.Services[targetNN] = &service + + endpointSliceList := new(discoveryv1.EndpointSliceList) + if err := r.List(tctx, endpointSliceList, + client.InNamespace(targetNN.Namespace), + client.MatchingLabels{ + discoveryv1.LabelServiceName: targetNN.Name, + }, + ); err != nil { + r.Log.Error(err, "failed to list endpoint slices", "Service", targetNN) + terr = err + continue + } + + tctx.EndpointSlices[targetNN] = endpointSliceList.Items + } + return terr +} + +func (r *UDPRouteReconciler) listUDPRoutesForReferenceGrant(ctx context.Context, obj client.Object) (requests []reconcile.Request) { + grant, ok := obj.(*v1beta1.ReferenceGrant) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to ReferenceGrant") + return nil + } + + var udpRouteList gatewayv1alpha2.UDPRouteList + if err := r.List(ctx, &udpRouteList); err != nil { + r.Log.Error(err, "failed to list udproutes for reference ReferenceGrant", "ReferenceGrant", k8stypes.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()}) + return nil + } + + for _, udpRoute := range udpRouteList.Items { + tr := v1beta1.ReferenceGrantFrom{ + Group: gatewayv1.GroupName, + Kind: KindUDPRoute, + Namespace: v1beta1.Namespace(udpRoute.GetNamespace()), + } + for _, from := range grant.Spec.From { + if from == tr { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: udpRoute.GetNamespace(), + Name: udpRoute.GetName(), + }, + }) + } + } + } + return requests +} + +func (r *UDPRouteReconciler) listUDPRoutesByServiceRef(ctx context.Context, obj client.Object) []reconcile.Request { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := endpointSlice.GetNamespace() + serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName] + + trList := &gatewayv1alpha2.UDPRouteList{} + if err := r.List(ctx, trList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName), + }); err != nil { + r.Log.Error(err, "failed to list udproutes by service", "service", serviceName) + return nil + } + requests := make([]reconcile.Request, 0, len(trList.Items)) + for _, tr := range trList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tr.Namespace, + Name: tr.Name, + }, + }) + } + return requests +} diff --git a/internal/controller/utils.go b/internal/controller/utils.go index 72b8ab59c..7566647cf 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -28,10 +28,8 @@ import ( "slices" "strings" - "github.com/api7/gopkg/pkg/log" "github.com/go-logr/logr" "github.com/samber/lo" - "go.uber.org/zap" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" @@ -325,7 +323,11 @@ func SetRouteParentRef(routeParentStatus *gatewayv1.RouteParentStatus, gatewayNa } func ParseRouteParentRefs( - ctx context.Context, mgrc client.Client, route client.Object, parentRefs []gatewayv1.ParentReference, + ctx context.Context, + mgrc client.Client, + log logr.Logger, + route client.Object, + parentRefs []gatewayv1.ParentReference, ) ([]RouteParentRefContext, error) { gateways := make([]RouteParentRefContext, 0) for _, parentRef := range parentRefs { @@ -394,12 +396,10 @@ func ParseRouteParentRefs( listenerName = string(listener.Name) ok, err := routeMatchesListenerAllowedRoutes(ctx, mgrc, route, listener.AllowedRoutes, gateway.Namespace, parentRef.Namespace) if err != nil { - log.Warnw("failed matching listener to a route for gateway", - zap.String("listener", string(listener.Name)), - zap.String("route", route.GetName()), - zap.String("gateway", gateway.Name), - zap.Error(err), - ) + log.Error(err, "failed matching listener to a route for gateway", + "listener", string(listener.Name), + "route", route.GetName(), + "gateway", gateway.Name) } if !ok { reason = gatewayv1.RouteReasonNotAllowedByListeners @@ -972,7 +972,11 @@ func ProcessGatewayProxy(r client.Client, log logr.Logger, tctx *provider.Transl } if cp.Service != nil { +<<<<<<< HEAD serviceNN := k8stypes.NamespacedName{ +======= + if err := addProviderEndpointsToTranslateContext(tctx, r, log, k8stypes.NamespacedName{ +>>>>>>> d9550d88 (chore: unify the logging component (#2584)) Namespace: gatewayProxy.GetNamespace(), Name: cp.Service.Name, } @@ -1029,7 +1033,6 @@ func filterHostnames(gateways []RouteParentRefContext, httpRoute *gatewayv1.HTTP } } - log.Debugw("filtered hostnames", zap.Any("httpRouteHostnames", httpRoute.Spec.Hostnames), zap.Any("hostnames", filteredHostnames)) httpRoute.Spec.Hostnames = filteredHostnames return httpRoute, nil } @@ -1371,7 +1374,11 @@ func ProcessIngressClassParameters(tctx *provider.TranslateContext, c client.Cli // process control plane provider service if cp.Service != nil { +<<<<<<< HEAD serviceNN := k8stypes.NamespacedName{ +======= + if err := addProviderEndpointsToTranslateContext(tctx, c, log, client.ObjectKey{ +>>>>>>> d9550d88 (chore: unify the logging component (#2584)) Namespace: gatewayProxy.GetNamespace(), Name: cp.Service.Name, } @@ -1495,18 +1502,37 @@ func distinctRequests(requests []reconcile.Request) []reconcile.Request { return distinctRequests } -func addProviderEndpointsToTranslateContext(tctx *provider.TranslateContext, c client.Client, serviceNN k8stypes.NamespacedName) error { - log.Debugw("to process provider endpoints by provider.service", zap.Any("service", serviceNN)) +func addProviderEndpointsToTranslateContext(tctx *provider.TranslateContext, c client.Client, log logr.Logger, serviceNN k8stypes.NamespacedName) error { + log.V(1).Info("to process provider endpoints by provider.service", "service", serviceNN) var ( service corev1.Service ) if err := c.Get(tctx, serviceNN, &service); err != nil { - log.Errorw("failed to get service from GatewayProxy provider", zap.Error(err), zap.Any("key", serviceNN)) + log.Error(err, "failed to get service from GatewayProxy provider", "service", serviceNN) return err } tctx.Services[serviceNN] = &service +<<<<<<< HEAD return resolveServiceEndpoints(tctx, c, serviceNN, true, nil) +======= + // get es + var ( + esList discoveryv1.EndpointSliceList + ) + if err := c.List(tctx, &esList, + client.InNamespace(serviceNN.Namespace), + client.MatchingLabels{ + discoveryv1.LabelServiceName: serviceNN.Name, + }); err != nil { + log.Error(err, "failed to get endpoints for GatewayProxy provider", "endpoints", serviceNN) + + return err + } + tctx.EndpointSlices[serviceNN] = esList.Items + + return nil +>>>>>>> d9550d88 (chore: unify the logging component (#2584)) } func TypePredicate[T client.Object]() func(obj client.Object) bool { diff --git a/internal/manager/readiness/manager.go b/internal/manager/readiness/manager.go index 7ba4b83bb..eca380eb8 100644 --- a/internal/manager/readiness/manager.go +++ b/internal/manager/readiness/manager.go @@ -24,8 +24,7 @@ import ( "sync/atomic" "time" - "github.com/api7/gopkg/pkg/log" - "go.uber.org/zap" + "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" k8stypes "k8s.io/apimachinery/pkg/types" @@ -79,16 +78,19 @@ type readinessManager struct { done chan struct{} isReady atomic.Bool + + log logr.Logger } // ReadinessManager tracks readiness of specific resources across the cluster. -func NewReadinessManager(client client.Client) ReadinessManager { +func NewReadinessManager(client client.Client, log logr.Logger) ReadinessManager { return &readinessManager{ client: client, state: make(map[schema.GroupVersionKind]map[k8stypes.NamespacedName]struct{}), started: make(chan struct{}), done: make(chan struct{}), isReady: atomic.Bool{}, + log: log.WithName("readiness"), } } @@ -123,7 +125,7 @@ func (r *readinessManager) Start(ctx context.Context) error { }) } if len(expected) > 0 { - log.Debugw("registering readiness state", zap.Any("gvk", gvk), zap.Any("expected", expected)) + r.log.V(1).Info("registering readiness state", "gvk", gvk, "expected", expected) r.registerState(gvk, expected) } } diff --git a/internal/manager/run.go b/internal/manager/run.go index de64f62a2..89c242265 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -164,8 +164,13 @@ func Run(ctx context.Context, logger logr.Logger) error { return err } +<<<<<<< HEAD readier := readiness.NewReadinessManager(mgr.GetClient()) registerReadinessGVK(mgr, readier) +======= + readier := readiness.NewReadinessManager(mgr.GetClient(), logger) + registerReadinessGVK(mgr.GetClient(), readier) +>>>>>>> d9550d88 (chore: unify the logging component (#2584)) if err := mgr.Add(readier); err != nil { setupLog.Error(err, "unable to add readiness manager") @@ -179,12 +184,13 @@ func Run(ctx context.Context, logger logr.Logger) error { providerType := string(config.ControllerConfig.ProviderConfig.Type) - provider, err := provider.New(providerType, updater.Writer(), readier, &provider.Options{ + providerOptions := &provider.Options{ SyncTimeout: config.ControllerConfig.ExecADCTimeout.Duration, SyncPeriod: config.ControllerConfig.ProviderConfig.SyncPeriod.Duration, InitSyncDelay: config.ControllerConfig.ProviderConfig.InitSyncDelay.Duration, BackendMode: string(config.ControllerConfig.ProviderConfig.Type), - }) + } + provider, err := provider.New(providerType, logger, updater.Writer(), readier, providerOptions) if err != nil { setupLog.Error(err, "unable to create provider") return err diff --git a/internal/provider/apisix/provider.go b/internal/provider/apisix/provider.go index 7f06fdbd1..687474031 100644 --- a/internal/provider/apisix/provider.go +++ b/internal/provider/apisix/provider.go @@ -23,8 +23,7 @@ import ( "sync" "time" - "github.com/api7/gopkg/pkg/log" - "go.uber.org/zap" + "github.com/go-logr/logr" networkingv1 "k8s.io/api/networking/v1" networkingv1beta1 "k8s.io/api/networking/v1beta1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -68,16 +67,17 @@ type apisixProvider struct { syncCh chan struct{} client *adcclient.Client + log logr.Logger } -func New(updater status.Updater, readier readiness.ReadinessManager, opts ...provider.Option) (provider.Provider, error) { +func New(log logr.Logger, updater status.Updater, readier readiness.ReadinessManager, opts ...provider.Option) (provider.Provider, error) { o := provider.Options{} o.ApplyOptions(opts) if o.BackendMode == "" { o.BackendMode = ProviderTypeAPISIX } - cli, err := adcclient.New(o.BackendMode, o.SyncTimeout) + cli, err := adcclient.New(log, o.BackendMode, o.SyncTimeout) if err != nil { return nil, err } @@ -85,10 +85,11 @@ func New(updater status.Updater, readier readiness.ReadinessManager, opts ...pro return &apisixProvider{ client: cli, Options: o, - translator: &translator.Translator{}, + translator: translator.NewTranslator(log), updater: updater, readier: readier, syncCh: make(chan struct{}, 1), + log: log.WithName("provider"), }, nil } @@ -97,7 +98,7 @@ func (d *apisixProvider) Register(pathPrefix string, mux *http.ServeMux) { } func (d *apisixProvider) Update(ctx context.Context, tctx *provider.TranslateContext, obj client.Object) error { - log.Debugw("updating object", zap.Any("object", obj)) + d.log.V(1).Info("updating object", "object", obj) var ( result *translator.TranslateResult resourceTypes []string @@ -176,13 +177,13 @@ func (d *apisixProvider) Update(ctx context.Context, tctx *provider.TranslateCon Consumers: result.Consumers, }, } - log.Debugw("updating config", zap.Any("task", task)) + d.log.V(1).Info("updating config", "task", task) return d.client.UpdateConfig(ctx, task) } func (d *apisixProvider) Delete(ctx context.Context, obj client.Object) error { - log.Debugw("deleting object", zap.Any("object", obj)) + d.log.V(1).Info("deleting object", "object", obj) var resourceTypes []string var labels map[string]string @@ -270,7 +271,7 @@ func (d *apisixProvider) Start(ctx context.Context) error { return nil } if err := d.sync(ctx); err != nil { - log.Error(err) + d.log.Error(err, "failed to sync") retrier.Next() } else { retrier.Reset() @@ -294,7 +295,7 @@ func (d *apisixProvider) syncNotify() { func (d *apisixProvider) handleADCExecutionErrors(statusesMap map[string]types.ADCExecutionErrors) { statusUpdateMap := d.resolveADCExecutionErrors(statusesMap) d.handleStatusUpdate(statusUpdateMap) - log.Debugw("handled ADC execution errors", zap.Any("status_record", statusesMap), zap.Any("status_update", statusUpdateMap)) + d.log.V(1).Info("handled ADC execution errors", "status_record", statusesMap, "status_update", statusUpdateMap) } func (d *apisixProvider) NeedLeaderElection() bool { diff --git a/internal/provider/apisix/status.go b/internal/provider/apisix/status.go index d4b1e8993..2af898c75 100644 --- a/internal/provider/apisix/status.go +++ b/internal/provider/apisix/status.go @@ -21,8 +21,6 @@ import ( "fmt" "strings" - "github.com/api7/gopkg/pkg/log" - "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" @@ -111,7 +109,7 @@ func (d *apisixProvider) updateStatus(nnk types.NamespacedNameKind, condition me }) case types.KindHTTPRoute: parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) - log.Debugw("updating HTTPRoute status", zap.Any("parentRefs", parentRefs)) + d.log.V(1).Info("updating HTTPRoute status", "parentRefs", parentRefs) gatewayRefs := map[types.NamespacedNameKind]struct{}{} for _, parentRef := range parentRefs { if parentRef.Kind == types.KindGateway { @@ -144,9 +142,82 @@ func (d *apisixProvider) updateStatus(nnk types.NamespacedNameKind, condition me return cp }), }) +<<<<<<< HEAD +======= + case types.KindUDPRoute: + parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) + d.log.V(1).Info("updating UDPRoute status", "parentRefs", parentRefs) + gatewayRefs := map[types.NamespacedNameKind]struct{}{} + for _, parentRef := range parentRefs { + if parentRef.Kind == types.KindGateway { + gatewayRefs[parentRef] = struct{}{} + } + } + d.updater.Update(status.Update{ + NamespacedName: nnk.NamespacedName(), + Resource: &gatewayv1alpha2.UDPRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*gatewayv1alpha2.UDPRoute).DeepCopy() + gatewayNs := cp.GetNamespace() + for i, ref := range cp.Status.Parents { + ns := gatewayNs + if ref.ParentRef.Namespace != nil { + ns = string(*ref.ParentRef.Namespace) + } + if ref.ParentRef.Kind == nil || *ref.ParentRef.Kind == types.KindGateway { + nnk := types.NamespacedNameKind{ + Name: string(ref.ParentRef.Name), + Namespace: ns, + Kind: types.KindGateway, + } + if _, ok := gatewayRefs[nnk]; ok { + ref.Conditions = cutils.MergeCondition(ref.Conditions, condition) + cp.Status.Parents[i] = ref + } + } + } + return cp + }), + }) + case types.KindTCPRoute: + parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) + d.log.V(1).Info("updating TCPRoute status", "parentRefs", parentRefs) + gatewayRefs := map[types.NamespacedNameKind]struct{}{} + for _, parentRef := range parentRefs { + if parentRef.Kind == types.KindGateway { + gatewayRefs[parentRef] = struct{}{} + } + } + d.updater.Update(status.Update{ + NamespacedName: nnk.NamespacedName(), + Resource: &gatewayv1alpha2.TCPRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*gatewayv1alpha2.TCPRoute).DeepCopy() + gatewayNs := cp.GetNamespace() + for i, ref := range cp.Status.Parents { + ns := gatewayNs + if ref.ParentRef.Namespace != nil { + ns = string(*ref.ParentRef.Namespace) + } + if ref.ParentRef.Kind == nil || *ref.ParentRef.Kind == types.KindGateway { + nnk := types.NamespacedNameKind{ + Name: string(ref.ParentRef.Name), + Namespace: ns, + Kind: types.KindGateway, + } + if _, ok := gatewayRefs[nnk]; ok { + ref.Conditions = cutils.MergeCondition(ref.Conditions, condition) + cp.Status.Parents[i] = ref + } + } + } + return cp + }), + }) +>>>>>>> d9550d88 (chore: unify the logging component (#2584)) case types.KindGRPCRoute: parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) - log.Debugw("updating GRPCRoute status", zap.Any("parentRefs", parentRefs)) + d.log.V(1).Info("updating GRPCRoute status", "parentRefs", parentRefs) gatewayRefs := map[types.NamespacedNameKind]struct{}{} for _, parentRef := range parentRefs { if parentRef.Kind == types.KindGateway { @@ -208,7 +279,7 @@ func (d *apisixProvider) handleEmptyFailedStatuses( ) { resource, err := d.client.GetResources(configName) if err != nil { - log.Errorw("failed to get resources from store", zap.String("configName", configName), zap.Error(err)) + d.log.Error(err, "failed to get resources from store", "configName", configName) return } @@ -226,7 +297,7 @@ func (d *apisixProvider) handleEmptyFailedStatuses( globalRules, err := d.client.ListGlobalRules(configName) if err != nil { - log.Errorw("failed to list global rules", zap.String("configName", configName), zap.Error(err)) + d.log.Error(err, "failed to list global rules", "configName", configName) return } for _, rule := range globalRules { @@ -249,11 +320,10 @@ func (d *apisixProvider) handleDetailedFailedStatuses( id := status.Event.ResourceID labels, err := d.client.GetResourceLabel(configName, status.Event.ResourceType, id) if err != nil { - log.Errorw("failed to get resource label", - zap.String("configName", configName), - zap.String("resourceType", status.Event.ResourceType), - zap.String("id", id), - zap.Error(err), + d.log.Error(err, "failed to get resource label", + "configName", configName, + "resourceType", status.Event.ResourceType, + "id", id, ) continue } diff --git a/internal/provider/init/init.go b/internal/provider/init/init.go index efd5b43d1..7c807b847 100644 --- a/internal/provider/init/init.go +++ b/internal/provider/init/init.go @@ -23,14 +23,28 @@ import ( "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/provider/api7ee" "github.com/apache/apisix-ingress-controller/internal/provider/apisix" + "github.com/go-logr/logr" ) func init() { provider.Register("apisix", apisix.New) +<<<<<<< HEAD provider.Register("apisix-standalone", func(statusUpdater status.Updater, readinessManager readiness.ReadinessManager, opts ...provider.Option) (provider.Provider, error) { opts = append(opts, provider.WithBackendMode("apisix-standalone")) opts = append(opts, provider.WithResolveEndpoints()) return apisix.New(statusUpdater, readinessManager, opts...) }) provider.Register("api7ee", api7ee.New) +======= + provider.Register("apisix-standalone", + func(log logr.Logger, + statusUpdater status.Updater, + readinessManager readiness.ReadinessManager, + opts ...provider.Option, + ) (provider.Provider, error) { + opts = append(opts, provider.WithBackendMode("apisix-standalone")) + opts = append(opts, provider.WithResolveEndpoints()) + return apisix.New(log, statusUpdater, readinessManager, opts...) + }) +>>>>>>> d9550d88 (chore: unify the logging component (#2584)) } diff --git a/internal/provider/options.go b/internal/provider/options.go index 540f2e63f..379e8a0bd 100644 --- a/internal/provider/options.go +++ b/internal/provider/options.go @@ -17,7 +17,9 @@ package provider -import "time" +import ( + "time" +) type Option interface { ApplyToList(*Options) diff --git a/internal/provider/register.go b/internal/provider/register.go index 25cc670dc..fddb1af51 100644 --- a/internal/provider/register.go +++ b/internal/provider/register.go @@ -23,13 +23,14 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/manager/readiness" + "github.com/go-logr/logr" ) type RegisterHandler interface { Register(pathPrefix string, mux *http.ServeMux) } -type RegisterFunc func(status.Updater, readiness.ReadinessManager, ...Option) (Provider, error) +type RegisterFunc func(logr.Logger, status.Updater, readiness.ReadinessManager, ...Option) (Provider, error) var providers = map[string]RegisterFunc{} @@ -45,10 +46,16 @@ func Get(name string) (RegisterFunc, error) { return f, nil } -func New(providerType string, updater status.Updater, readinesser readiness.ReadinessManager, opts ...Option) (Provider, error) { +func New( + providerType string, + log logr.Logger, + updater status.Updater, + readinesser readiness.ReadinessManager, + opts ...Option, +) (Provider, error) { f, err := Get(providerType) if err != nil { return nil, fmt.Errorf("failed to get provider %q: %w", providerType, err) } - return f(updater, readinesser, opts...) + return f(log, updater, readinesser, opts...) }